* Minor refactoring and stop action handler scaffolding

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Initial implementation of Stop action

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Add e2e test for Stop action

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Minor changes after self review

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Remove unnecessary conditional

uuid.Parse() with an empty string returns an error.

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Don't mark a completed VDiff as stopped

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Need to use withDDL.ExecIgnore in the vdiff retry loop

to avoid potentially doing DDL during tablet state transitions
which can cause the transition to be slow or even hang.

Signed-off-by: Matt Lord <mattalord@gmail.com>
This commit is contained in:
Matt Lord 2022-07-26 17:26:17 -04:00 коммит произвёл GitHub
Родитель 12275cda4e
Коммит a9a8dd7988
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 408 добавлений и 295 удалений

Просмотреть файл

@ -37,6 +37,7 @@ type testCase struct {
resume bool // test resume functionality with this workflow
// If testing resume, what new rows should be diff'd. These rows must have a PK > all initial rows and retry rows.
resumeInsert string
stop bool // test stop functionality with this workflow
testCLIErrors bool // test CLI errors against this workflow (only needs to be done once)
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
}
@ -79,6 +80,7 @@ var testCases = []*testCase{
retryInsert: `insert into customer(cid, name, typ) values(93234, 'Testy McTester Jr', 'enterprise'), (94234, 'Testy McTester II', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(95234, 'Testy McTester III', 'enterprise')`,
stop: true,
},
{
name: "Reshard/merge 3 to 1",
@ -93,6 +95,7 @@ var testCases = []*testCase{
retryInsert: `insert into customer(cid, name, typ) values(96234, 'Testy McTester IV', 'enterprise')`,
resume: true,
resumeInsert: `insert into customer(cid, name, typ) values(97234, 'Testy McTester V', 'enterprise'), (98234, 'Testy McTester VI', 'enterprise')`,
stop: true,
},
}
@ -185,6 +188,9 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
}
// These are done here so that we have a valid workflow to test the commands against
if tc.stop {
testStop(t, ksWorkflow, allCellNames)
}
if tc.testCLICreateWait {
testCLICreateWait(t, ksWorkflow, allCellNames)
}
@ -279,6 +285,20 @@ func testResume(t *testing.T, tc *testCase, cells string) {
})
}
func testStop(t *testing.T, ksWorkflow, cells string) {
t.Run("Stop", func(t *testing.T) {
// create a new VDiff and immediately stop it
uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "create", "", false)
_, _ = performVDiff2Action(t, ksWorkflow, cells, "stop", uuid, false)
// confirm the VDiff is in the expected stopped state
_, output := performVDiff2Action(t, ksWorkflow, cells, "show", uuid, false)
jsonOutput := getVDiffInfo(output)
require.Equal(t, "stopped", jsonOutput.State)
// confirm that the context cancelled error was also cleared
require.False(t, strings.Contains(output, `"Errors":`))
})
}
func testAutoRetryError(t *testing.T, tc *testCase, cells string) {
t.Run("Auto retry on error", func(t *testing.T) {
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)

153
go/vt/proto/tabletmanagerdata/tabletmanagerdata.pb.go сгенерированный
Просмотреть файл

@ -4799,12 +4799,12 @@ type VDiffRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"`
Workflow string `protobuf:"bytes,2,opt,name=workflow,proto3" json:"workflow,omitempty"`
Command string `protobuf:"bytes,3,opt,name=command,proto3" json:"command,omitempty"`
SubCommand string `protobuf:"bytes,4,opt,name=sub_command,json=subCommand,proto3" json:"sub_command,omitempty"`
VdiffUuid string `protobuf:"bytes,5,opt,name=vdiff_uuid,json=vdiffUuid,proto3" json:"vdiff_uuid,omitempty"`
Options *VDiffOptions `protobuf:"bytes,6,opt,name=options,proto3" json:"options,omitempty"`
Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"`
Workflow string `protobuf:"bytes,2,opt,name=workflow,proto3" json:"workflow,omitempty"`
Action string `protobuf:"bytes,3,opt,name=action,proto3" json:"action,omitempty"`
ActionArg string `protobuf:"bytes,4,opt,name=action_arg,json=actionArg,proto3" json:"action_arg,omitempty"`
VdiffUuid string `protobuf:"bytes,5,opt,name=vdiff_uuid,json=vdiffUuid,proto3" json:"vdiff_uuid,omitempty"`
Options *VDiffOptions `protobuf:"bytes,6,opt,name=options,proto3" json:"options,omitempty"`
}
func (x *VDiffRequest) Reset() {
@ -4853,16 +4853,16 @@ func (x *VDiffRequest) GetWorkflow() string {
return ""
}
func (x *VDiffRequest) GetCommand() string {
func (x *VDiffRequest) GetAction() string {
if x != nil {
return x.Command
return x.Action
}
return ""
}
func (x *VDiffRequest) GetSubCommand() string {
func (x *VDiffRequest) GetActionArg() string {
if x != nil {
return x.SubCommand
return x.ActionArg
}
return ""
}
@ -5701,77 +5701,76 @@ var file_tabletmanagerdata_proto_rawDesc = []byte{
0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x51, 0x75, 0x65, 0x72,
0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22,
0xdb, 0x01, 0x0a, 0x0c, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0xd7, 0x01, 0x0a, 0x0c, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x1a, 0x0a, 0x08, 0x6b, 0x65, 0x79, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x08, 0x6b, 0x65, 0x79, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x1a, 0x0a, 0x08,
0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08,
0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d,
0x61, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, 0x6d,
0x61, 0x6e, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x76, 0x64, 0x69, 0x66, 0x66, 0x5f, 0x75, 0x75, 0x69,
0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x76, 0x64, 0x69, 0x66, 0x66, 0x55, 0x75,
0x69, 0x64, 0x12, 0x39, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x06, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61,
0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x4f, 0x70, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x6a, 0x0a,
0x0d, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e,
0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2a,
0x0a, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12,
0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75,
0x6c, 0x74, 0x52, 0x06, 0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x76, 0x64,
0x69, 0x66, 0x66, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09,
0x76, 0x64, 0x69, 0x66, 0x66, 0x55, 0x75, 0x69, 0x64, 0x22, 0x79, 0x0a, 0x12, 0x56, 0x44, 0x69,
0x66, 0x66, 0x50, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12,
0x21, 0x0a, 0x0c, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70,
0x65, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x63, 0x65, 0x6c,
0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43,
0x65, 0x6c, 0x6c, 0x12, 0x1f, 0x0a, 0x0b, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x63, 0x65,
0x6c, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74,
0x43, 0x65, 0x6c, 0x6c, 0x22, 0x6a, 0x0a, 0x12, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x70,
0x6f, 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1b, 0x0a, 0x0a, 0x6f, 0x6e,
0x6c, 0x79, 0x5f, 0x70, 0x5f, 0x6b, 0x5f, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07,
0x6f, 0x6e, 0x6c, 0x79, 0x50, 0x4b, 0x53, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x65, 0x62, 0x75, 0x67,
0x5f, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x64, 0x65,
0x62, 0x75, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x6f, 0x72, 0x6d,
0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74,
0x22, 0x82, 0x02, 0x0a, 0x10, 0x56, 0x44, 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1d, 0x0a,
0x0a, 0x61, 0x75, 0x74, 0x6f, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28,
0x08, 0x52, 0x09, 0x61, 0x75, 0x74, 0x6f, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x19, 0x0a, 0x08,
0x6d, 0x61, 0x78, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07,
0x6d, 0x61, 0x78, 0x52, 0x6f, 0x77, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b,
0x73, 0x75, 0x6d, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b,
0x73, 0x75, 0x6d, 0x12, 0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5f, 0x70, 0x63,
0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x50,
0x63, 0x74, 0x12, 0x27, 0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x73, 0x65,
0x63, 0x6f, 0x6e, 0x64, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x74, 0x69, 0x6d,
0x65, 0x6f, 0x75, 0x74, 0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x38, 0x0a, 0x19, 0x6d,
0x61, 0x78, 0x5f, 0x65, 0x78, 0x74, 0x72, 0x61, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x74, 0x6f,
0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15,
0x6d, 0x61, 0x78, 0x45, 0x78, 0x74, 0x72, 0x61, 0x52, 0x6f, 0x77, 0x73, 0x54, 0x6f, 0x43, 0x6f,
0x6d, 0x70, 0x61, 0x72, 0x65, 0x22, 0xf2, 0x01, 0x0a, 0x0c, 0x56, 0x44, 0x69, 0x66, 0x66, 0x4f,
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72,
0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25,
0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61,
0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x50, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x0c, 0x63, 0x6f, 0x72, 0x65, 0x5f, 0x6f, 0x70, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x61, 0x62,
0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x61, 0x72, 0x67, 0x18, 0x04,
0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x72, 0x67, 0x12,
0x1d, 0x0a, 0x0a, 0x76, 0x64, 0x69, 0x66, 0x66, 0x5f, 0x75, 0x75, 0x69, 0x64, 0x18, 0x05, 0x20,
0x01, 0x28, 0x09, 0x52, 0x09, 0x76, 0x64, 0x69, 0x66, 0x66, 0x55, 0x75, 0x69, 0x64, 0x12, 0x39,
0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1f, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64,
0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x52, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x6a, 0x0a, 0x0d, 0x56, 0x44, 0x69,
0x66, 0x66, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64,
0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x6f, 0x75,
0x74, 0x70, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x71, 0x75, 0x65,
0x72, 0x79, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06,
0x6f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x76, 0x64, 0x69, 0x66, 0x66, 0x5f,
0x75, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x76, 0x64, 0x69, 0x66,
0x66, 0x55, 0x75, 0x69, 0x64, 0x22, 0x79, 0x0a, 0x12, 0x56, 0x44, 0x69, 0x66, 0x66, 0x50, 0x69,
0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x74,
0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x73, 0x12, 0x1f,
0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x63, 0x65, 0x6c, 0x6c, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x43, 0x65, 0x6c, 0x6c, 0x12,
0x1f, 0x0a, 0x0b, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x63, 0x65, 0x6c, 0x6c, 0x18, 0x03,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x43, 0x65, 0x6c, 0x6c,
0x22, 0x6a, 0x0a, 0x12, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x4f,
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1b, 0x0a, 0x0a, 0x6f, 0x6e, 0x6c, 0x79, 0x5f, 0x70,
0x5f, 0x6b, 0x5f, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x6f, 0x6e, 0x6c, 0x79,
0x50, 0x4b, 0x53, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x65, 0x62, 0x75, 0x67, 0x5f, 0x71, 0x75, 0x65,
0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x64, 0x65, 0x62, 0x75, 0x67, 0x51,
0x75, 0x65, 0x72, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x03,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x22, 0x82, 0x02, 0x0a,
0x10, 0x56, 0x44, 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x75, 0x74,
0x6f, 0x5f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, 0x61,
0x75, 0x74, 0x6f, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x19, 0x0a, 0x08, 0x6d, 0x61, 0x78, 0x5f,
0x72, 0x6f, 0x77, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x6d, 0x61, 0x78, 0x52,
0x6f, 0x77, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x18,
0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x08, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x73, 0x75, 0x6d, 0x12,
0x1d, 0x0a, 0x0a, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x5f, 0x70, 0x63, 0x74, 0x18, 0x05, 0x20,
0x01, 0x28, 0x03, 0x52, 0x09, 0x73, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x50, 0x63, 0x74, 0x12, 0x27,
0x0a, 0x0f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x73, 0x65, 0x63, 0x6f, 0x6e, 0x64,
0x73, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74,
0x53, 0x65, 0x63, 0x6f, 0x6e, 0x64, 0x73, 0x12, 0x38, 0x0a, 0x19, 0x6d, 0x61, 0x78, 0x5f, 0x65,
0x78, 0x74, 0x72, 0x61, 0x5f, 0x72, 0x6f, 0x77, 0x73, 0x5f, 0x74, 0x6f, 0x5f, 0x63, 0x6f, 0x6d,
0x70, 0x61, 0x72, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x15, 0x6d, 0x61, 0x78, 0x45,
0x78, 0x74, 0x72, 0x61, 0x52, 0x6f, 0x77, 0x73, 0x54, 0x6f, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72,
0x65, 0x22, 0xf2, 0x01, 0x0a, 0x0c, 0x56, 0x44, 0x69, 0x66, 0x66, 0x4f, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x5f, 0x6f, 0x70, 0x74,
0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x74, 0x61, 0x62,
0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56,
0x44, 0x69, 0x66, 0x66, 0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52,
0x0b, 0x63, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e,
0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e,
0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65,
0x70, 0x6f, 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x72, 0x65, 0x70,
0x6f, 0x72, 0x74, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x30, 0x5a, 0x2e, 0x76, 0x69,
0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67,
0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65,
0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
0x44, 0x69, 0x66, 0x66, 0x50, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x52, 0x0d, 0x70, 0x69, 0x63, 0x6b, 0x65, 0x72, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x12, 0x46, 0x0a, 0x0c, 0x63, 0x6f, 0x72, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d,
0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66,
0x43, 0x6f, 0x72, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0b, 0x63, 0x6f, 0x72,
0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4c, 0x0a, 0x0e, 0x72, 0x65, 0x70, 0x6f,
0x72, 0x74, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x25, 0x2e, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x44, 0x69, 0x66, 0x66, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74,
0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x0d, 0x72, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x4f,
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x30, 0x5a, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73,
0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x6d, 0x61, 0x6e,
0x61, 0x67, 0x65, 0x72, 0x64, 0x61, 0x74, 0x61, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

Просмотреть файл

@ -4292,17 +4292,17 @@ func (m *VDiffRequest) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i--
dAtA[i] = 0x2a
}
if len(m.SubCommand) > 0 {
i -= len(m.SubCommand)
copy(dAtA[i:], m.SubCommand)
i = encodeVarint(dAtA, i, uint64(len(m.SubCommand)))
if len(m.ActionArg) > 0 {
i -= len(m.ActionArg)
copy(dAtA[i:], m.ActionArg)
i = encodeVarint(dAtA, i, uint64(len(m.ActionArg)))
i--
dAtA[i] = 0x22
}
if len(m.Command) > 0 {
i -= len(m.Command)
copy(dAtA[i:], m.Command)
i = encodeVarint(dAtA, i, uint64(len(m.Command)))
if len(m.Action) > 0 {
i -= len(m.Action)
copy(dAtA[i:], m.Action)
i = encodeVarint(dAtA, i, uint64(len(m.Action)))
i--
dAtA[i] = 0x1a
}
@ -6306,11 +6306,11 @@ func (m *VDiffRequest) SizeVT() (n int) {
if l > 0 {
n += 1 + l + sov(uint64(l))
}
l = len(m.Command)
l = len(m.Action)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
l = len(m.SubCommand)
l = len(m.ActionArg)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
@ -15283,7 +15283,7 @@ func (m *VDiffRequest) UnmarshalVT(dAtA []byte) error {
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Command", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
@ -15311,11 +15311,11 @@ func (m *VDiffRequest) UnmarshalVT(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Command = string(dAtA[iNdEx:postIndex])
m.Action = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SubCommand", wireType)
return fmt.Errorf("proto: wrong wireType = %d for field ActionArg", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
@ -15343,7 +15343,7 @@ func (m *VDiffRequest) UnmarshalVT(dAtA []byte) error {
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SubCommand = string(dAtA[iNdEx:postIndex])
m.ActionArg = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 5:
if wireType != 2 {

Просмотреть файл

@ -91,7 +91,7 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
return usage
}
if action == "" {
return fmt.Errorf("invalid action %s; %s", subFlags.Arg(1), usage)
return fmt.Errorf("invalid action '%s'; %s", subFlags.Arg(1), usage)
}
keyspace, workflowName, err := splitKeyspaceWorkflow(subFlags.Arg(0))
if err != nil {
@ -144,10 +144,10 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
return fmt.Errorf("can only show a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", keyspace, workflowName)
}
}
case vdiff.ResumeAction:
case vdiff.StopAction, vdiff.ResumeAction:
vdiffUUID, err = uuid.Parse(actionArg)
if err != nil {
return fmt.Errorf("can only resume a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", keyspace, workflowName)
return fmt.Errorf("can only %s a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", action, keyspace, workflowName)
}
case vdiff.DeleteAction:
switch actionArg {
@ -159,8 +159,9 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
}
}
default:
return fmt.Errorf("invalid action %s; %s", action, usage)
return fmt.Errorf("invalid action '%s'; %s", action, usage)
}
type ErrorResponse struct {
Error string
}
@ -204,11 +205,16 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
if err := displayVDiff2ShowResponse(wr, format, keyspace, workflowName, actionArg, output, *verbose); err != nil {
return err
}
case vdiff.DeleteAction:
displayVDiff2ActionStatusResponse(wr, format, action, vdiff.CompletedState)
case vdiff.StopAction, vdiff.DeleteAction:
uuidToDisplay := ""
if actionArg != vdiff.AllActionArg {
uuidToDisplay = vdiffUUID.String()
}
displayVDiff2ActionStatusResponse(wr, format, uuidToDisplay, action, vdiff.CompletedState)
default:
return fmt.Errorf("invalid action %s; %s", action, usage)
}
return nil
}
@ -447,16 +453,24 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
var tableSummaryMap map[string]vdiffTableSummary
var reports map[string]map[string]vdiff.DiffReport
// Keep a tally of the states across all tables and shards
// Keep a tally of the states across all tables in all shards
tableStateCounts := map[vdiff.VDiffState]int{
vdiff.UnknownState: 0,
vdiff.PendingState: 0,
vdiff.StartedState: 0,
vdiff.StoppedState: 0,
vdiff.ErrorState: 0,
vdiff.CompletedState: 0,
}
// Keep a tally of the summary states across all shards
shardStateCounts := map[vdiff.VDiffState]int{
vdiff.UnknownState: 0,
vdiff.PendingState: 0,
vdiff.StartedState: 0,
vdiff.StoppedState: 0,
vdiff.ErrorState: 0,
vdiff.CompletedState: 0,
}
// Keep a tally of the shards that have been marked as completed
completedShards := 0
// Keep a tally of the approximate total rows to process as we'll use this for our progress report
totalRowsToCompare := int64(0)
var shards []string
@ -487,11 +501,9 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
if le := row.AsString("last_error", ""); le != "" {
summary.Errors[shard] = le
}
// Keep track of how many shards are marked as completed. We check this combined
// Keep track of how many shards are marked as a specific state. We check this combined
// with the shard.table states to determine the VDiff summary state.
if vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", ""))) == vdiff.CompletedState {
completedShards++
}
shardStateCounts[vdiff.VDiffState(strings.ToLower(row.AsString("vdiff_state", "")))]++
}
{ // Global VDiff summary updates that take into account the per table details per shard
@ -559,9 +571,12 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
}
// The global VDiff summary should progress from pending->started->completed with
// error for any table being sticky for the global summary. We should only consider
// the VDiff to be complete if it's completed for every table on every shard.
if tableStateCounts[vdiff.ErrorState] > 0 {
// stopped for any shard and error for any table being sticky for the global summary.
// We should only consider the VDiff to be complete if it's completed for every table
// on every shard.
if shardStateCounts[vdiff.StoppedState] > 0 {
summary.State = vdiff.StoppedState
} else if tableStateCounts[vdiff.ErrorState] > 0 {
summary.State = vdiff.ErrorState
} else if tableStateCounts[vdiff.StartedState] > 0 {
summary.State = vdiff.StartedState
@ -574,7 +589,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
// So we only mark the vdiff for the shard as completed when we've finished processing
// rows from all of the sources -- which is recorded by marking the vdiff done for the
// shard by setting _vt.vdiff.state = completed.
if completedShards == len(shards) {
if shardStateCounts[vdiff.CompletedState] == len(shards) {
summary.State = vdiff.CompletedState
} else {
summary.State = vdiff.StartedState
@ -605,7 +620,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
//endregion
func displayVDiff2ScheduledResponse(wr *wrangler.Wrangler, format string, uuid string, typ vdiff.VDiffAction) {
func displayVDiff2ScheduledResponse(wr *wrangler.Wrangler, format, uuid string, typ vdiff.VDiffAction) {
if format == "json" {
type ScheduledResponse struct {
UUID string
@ -623,17 +638,18 @@ func displayVDiff2ScheduledResponse(wr *wrangler.Wrangler, format string, uuid s
}
}
func displayVDiff2ActionStatusResponse(wr *wrangler.Wrangler, format string, action vdiff.VDiffAction, status vdiff.VDiffState) {
func displayVDiff2ActionStatusResponse(wr *wrangler.Wrangler, format, uuid string, action vdiff.VDiffAction, status vdiff.VDiffState) {
if format == "json" {
type ActionStatusResponse struct {
UUID string `json:"UUID,omitempty"`
Action vdiff.VDiffAction
Status vdiff.VDiffState
}
resp := &ActionStatusResponse{Action: action, Status: status}
resp := &ActionStatusResponse{UUID: uuid, Action: action, Status: status}
jsonText, _ := json.MarshalIndent(resp, "", "\t")
wr.Logger().Printf(string(jsonText) + "\n")
} else {
msg := fmt.Sprintf("VDiff %s status is %s on target shards\n", action, status)
msg := fmt.Sprintf("The %s action for vdiff %s is %s on target shards\n", action, uuid, status)
wr.Logger().Printf(msg)
}
}

Просмотреть файл

@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/google/uuid"
@ -38,6 +37,7 @@ type VDiffAction string //nolint
const (
CreateAction VDiffAction = "create"
ShowAction VDiffAction = "show"
StopAction VDiffAction = "stop"
ResumeAction VDiffAction = "resume"
DeleteAction VDiffAction = "delete"
AllActionArg = "all"
@ -45,7 +45,7 @@ const (
)
var (
Actions = []VDiffAction{CreateAction, ShowAction, ResumeAction, DeleteAction}
Actions = []VDiffAction{CreateAction, ShowAction, StopAction, ResumeAction, DeleteAction}
ActionArgs = []string{AllActionArg, LastActionArg}
)
@ -65,140 +65,22 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat
_, _ = withDDL.Exec(ctx, withddl.QueryToTriggerWithDDL, dbClient.ExecuteFetch, dbClient.ExecuteFetch)
})
var qr *sqltypes.Result
var err error
options := req.Options
action := VDiffAction(strings.ToLower(req.Command))
action := VDiffAction(req.Action)
switch action {
case CreateAction, ResumeAction:
query := fmt.Sprintf(sqlGetVDiffID, encodeString(req.VdiffUuid))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return nil, err
}
recordFound := len(qr.Rows) == 1
if recordFound && action == CreateAction {
return nil, fmt.Errorf("vdiff with UUID %s already exists on tablet %v",
req.VdiffUuid, vde.thisTablet.Alias)
} else if action == ResumeAction {
if !recordFound {
return nil, fmt.Errorf("vdiff with UUID %s not found on tablet %v",
req.VdiffUuid, vde.thisTablet.Alias)
}
if resp.Id, err = qr.Named().Row().ToInt64("id"); err != nil {
return nil, fmt.Errorf("vdiff found with invalid id on tablet %v: %w",
vde.thisTablet.Alias, err)
}
}
options, err = vde.fixupOptions(options)
if err != nil {
return nil, err
}
optionsJSON, err := json.Marshal(options)
if err != nil {
return nil, err
}
if action == CreateAction {
query := fmt.Sprintf(sqlNewVDiff,
encodeString(req.Keyspace), encodeString(req.Workflow), "pending", encodeString(string(optionsJSON)),
vde.thisTablet.Shard, topoproto.TabletDbName(vde.thisTablet), req.VdiffUuid)
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return nil, err
}
if qr.InsertID == 0 {
return nil, fmt.Errorf("unable to create vdiff for UUID %s on tablet %v (%w)",
req.VdiffUuid, vde.thisTablet.Alias, err)
}
resp.Id = int64(qr.InsertID)
} else {
query := fmt.Sprintf(sqlResumeVDiff, encodeString(string(optionsJSON)), encodeString(req.VdiffUuid))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return nil, err
}
if qr.RowsAffected == 0 {
msg := fmt.Sprintf("no completed vdiff found for UUID %s on tablet %v",
req.VdiffUuid, vde.thisTablet.Alias)
if err != nil {
msg = fmt.Sprintf("%s (%v)", msg, err)
}
return nil, fmt.Errorf(msg)
}
}
resp.VdiffUuid = req.VdiffUuid
qr, err := vde.getVDiffByID(ctx, dbClient, resp.Id)
if err != nil {
return nil, err
}
vde.mu.Lock()
defer vde.mu.Unlock()
if err := vde.addController(qr.Named().Row(), options); err != nil {
if err := vde.handleCreateResumeAction(ctx, dbClient, action, req, resp); err != nil {
return nil, err
}
case ShowAction:
vdiffUUID := ""
if req.SubCommand == LastActionArg {
query := fmt.Sprintf(sqlGetMostRecentVDiff, encodeString(req.Keyspace), encodeString(req.Workflow))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return nil, err
}
if len(qr.Rows) == 1 {
row := qr.Named().Row()
vdiffUUID = row.AsString("vdiff_uuid", "")
}
} else {
if uuidt, err := uuid.Parse(req.SubCommand); err == nil {
vdiffUUID = uuidt.String()
}
if err := vde.handleShowAction(ctx, dbClient, action, req, resp); err != nil {
return nil, err
}
if vdiffUUID != "" {
resp.VdiffUuid = vdiffUUID
query := fmt.Sprintf(sqlGetVDiffByKeyspaceWorkflowUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(vdiffUUID))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return nil, err
}
switch len(qr.Rows) {
case 0:
return nil, fmt.Errorf("no vdiff found for UUID %s keyspace %s and workflow %s on tablet %v",
vdiffUUID, req.Keyspace, req.Workflow, vde.thisTablet.Alias)
case 1:
row := qr.Named().Row()
vdiffID, _ := row["id"].ToInt64()
summary, err := vde.getVDiffSummary(vdiffID, dbClient)
resp.Output = summary
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("too many vdiffs found (%d) for UUID %s keyspace %s and workflow %s on tablet %v",
len(qr.Rows), vdiffUUID, req.Keyspace, req.Workflow, vde.thisTablet.Alias)
}
}
switch req.SubCommand {
case AllActionArg:
if qr, err = dbClient.ExecuteFetch(sqlGetAllVDiffs, -1); err != nil {
return nil, err
}
resp.Output = sqltypes.ResultToProto3(qr)
case LastActionArg:
default:
if _, err := uuid.Parse(req.SubCommand); err != nil {
return nil, fmt.Errorf("action argument %s not supported", req.SubCommand)
}
case StopAction:
if err := vde.handleStopAction(ctx, dbClient, action, req, resp); err != nil {
return nil, err
}
case DeleteAction:
query := ""
switch req.SubCommand {
case AllActionArg:
query = fmt.Sprintf(sqlDeleteVDiffs, encodeString(req.Keyspace), encodeString(req.Workflow))
default:
uuid, err := uuid.Parse(req.SubCommand)
if err != nil {
return nil, fmt.Errorf("action argument %s not supported", req.SubCommand)
}
query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(uuid.String()))
}
if _, err = dbClient.ExecuteFetch(query, 1); err != nil {
if err := vde.handleDeleteAction(ctx, dbClient, action, req, resp); err != nil {
return nil, err
}
default:
@ -247,3 +129,168 @@ func (vde *Engine) fixupOptions(options *tabletmanagerdatapb.VDiffOptions) (*tab
return options, nil
}
func (vde *Engine) handleCreateResumeAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var qr *sqltypes.Result
var err error
options := req.Options
query := fmt.Sprintf(sqlGetVDiffID, encodeString(req.VdiffUuid))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
recordFound := len(qr.Rows) == 1
if recordFound && action == CreateAction {
return fmt.Errorf("vdiff with UUID %s already exists on tablet %v",
req.VdiffUuid, vde.thisTablet.Alias)
} else if action == ResumeAction {
if !recordFound {
return fmt.Errorf("vdiff with UUID %s not found on tablet %v",
req.VdiffUuid, vde.thisTablet.Alias)
}
if resp.Id, err = qr.Named().Row().ToInt64("id"); err != nil {
return fmt.Errorf("vdiff found with invalid id on tablet %v: %w",
vde.thisTablet.Alias, err)
}
}
if options, err = vde.fixupOptions(options); err != nil {
return err
}
optionsJSON, err := json.Marshal(options)
if err != nil {
return err
}
if action == CreateAction {
query := fmt.Sprintf(sqlNewVDiff,
encodeString(req.Keyspace), encodeString(req.Workflow), "pending", encodeString(string(optionsJSON)),
vde.thisTablet.Shard, topoproto.TabletDbName(vde.thisTablet), req.VdiffUuid)
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
if qr.InsertID == 0 {
return fmt.Errorf("unable to create vdiff for UUID %s on tablet %v (%w)",
req.VdiffUuid, vde.thisTablet.Alias, err)
}
resp.Id = int64(qr.InsertID)
} else {
query := fmt.Sprintf(sqlResumeVDiff, encodeString(string(optionsJSON)), encodeString(req.VdiffUuid))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
if qr.RowsAffected == 0 {
msg := fmt.Sprintf("no completed or stopped vdiff found for UUID %s on tablet %v",
req.VdiffUuid, vde.thisTablet.Alias)
if err != nil {
msg = fmt.Sprintf("%s (%v)", msg, err)
}
return fmt.Errorf(msg)
}
}
resp.VdiffUuid = req.VdiffUuid
qr, err = vde.getVDiffByID(ctx, dbClient, resp.Id)
if err != nil {
return err
}
vde.mu.Lock()
defer vde.mu.Unlock()
if err := vde.addController(qr.Named().Row(), options); err != nil {
return err
}
return nil
}
func (vde *Engine) handleShowAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var qr *sqltypes.Result
var err error
vdiffUUID := ""
if req.ActionArg == LastActionArg {
query := fmt.Sprintf(sqlGetMostRecentVDiff, encodeString(req.Keyspace), encodeString(req.Workflow))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
if len(qr.Rows) == 1 {
row := qr.Named().Row()
vdiffUUID = row.AsString("vdiff_uuid", "")
}
} else {
if uuidt, err := uuid.Parse(req.ActionArg); err == nil {
vdiffUUID = uuidt.String()
}
}
if vdiffUUID != "" {
resp.VdiffUuid = vdiffUUID
query := fmt.Sprintf(sqlGetVDiffByKeyspaceWorkflowUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(vdiffUUID))
if qr, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
switch len(qr.Rows) {
case 0:
return fmt.Errorf("no vdiff found for UUID %s keyspace %s and workflow %s on tablet %v",
vdiffUUID, req.Keyspace, req.Workflow, vde.thisTablet.Alias)
case 1:
row := qr.Named().Row()
vdiffID, _ := row["id"].ToInt64()
summary, err := vde.getVDiffSummary(vdiffID, dbClient)
resp.Output = summary
if err != nil {
return err
}
default:
return fmt.Errorf("too many vdiffs found (%d) for UUID %s keyspace %s and workflow %s on tablet %v",
len(qr.Rows), vdiffUUID, req.Keyspace, req.Workflow, vde.thisTablet.Alias)
}
}
switch req.ActionArg {
case AllActionArg:
if qr, err = dbClient.ExecuteFetch(sqlGetAllVDiffs, -1); err != nil {
return err
}
resp.Output = sqltypes.ResultToProto3(qr)
case LastActionArg:
default:
if _, err := uuid.Parse(req.ActionArg); err != nil {
return fmt.Errorf("action argument %s not supported", req.ActionArg)
}
}
return nil
}
func (vde *Engine) handleStopAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
vde.mu.Lock()
defer vde.mu.Unlock()
for _, controller := range vde.controllers {
if controller.uuid == req.VdiffUuid {
controller.Stop()
if err := controller.markStoppedByRequest(); err != nil {
return err
}
break
}
}
return nil
}
func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer.DBClient, action VDiffAction, req *tabletmanagerdatapb.VDiffRequest, resp *tabletmanagerdatapb.VDiffResponse) error {
var err error
query := ""
switch req.ActionArg {
case AllActionArg:
query = fmt.Sprintf(sqlDeleteVDiffs, encodeString(req.Keyspace), encodeString(req.Workflow))
default:
uuid, err := uuid.Parse(req.ActionArg)
if err != nil {
return fmt.Errorf("action argument %s not supported", req.ActionArg)
}
query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(uuid.String()))
}
if _, err = dbClient.ExecuteFetch(query, 1); err != nil {
return err
}
return nil
}

Просмотреть файл

@ -38,13 +38,14 @@ import (
)
/*
vdiff operation states: pending/started/completed/error/unknown
vdiff table states: pending/started/completed/error/unknown
vdiff operation states: pending/started/stopped/completed/error/unknown
vdiff table states: pending/started/stopped/completed/error/unknown
*/
type VDiffState string //nolint
const (
PendingState VDiffState = "pending"
StartedState VDiffState = "started"
StoppedState VDiffState = "stopped"
CompletedState VDiffState = "completed"
ErrorState VDiffState = "error"
UnknownState VDiffState = ""
@ -79,7 +80,7 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac
ct := &controller{
id: id,
uuid: row["vdiff_uuid"].String(),
uuid: row["vdiff_uuid"].ToString(),
workflow: row["workflow"].ToString(),
dbClientFactory: dbClientFactory,
ts: ts,
@ -223,6 +224,33 @@ func (ct *controller) start(ctx context.Context, dbClient binlogplayer.DBClient)
return nil
}
// markStoppedByRequest records the fact that this VDiff was stopped via user
// request and resets the error generated by cancelling the context to stop it:
// "vttablet: rpc error: code = Canceled desc = context canceled"
// This differentiates non-user requested stops that would occur e.g. during
// PlannedReparentShard or tablet restart, in those cases the error will be saved
// and will cause the VDiff to be retried ASAP -- which is NOT what we want here.
func (ct *controller) markStoppedByRequest() error {
dbClient := ct.vde.dbClientFactoryFiltered()
if err := dbClient.Connect(); err != nil {
return fmt.Errorf("encountered an error marking vdiff %s as stopped: %v", ct.uuid, err)
}
defer dbClient.Close()
query := fmt.Sprintf(sqlUpdateVDiffStopped, ct.id)
var res *sqltypes.Result
var err error
if res, err = dbClient.ExecuteFetch(query, 1); err != nil {
return fmt.Errorf("encountered an error marking vdiff %s as stopped: %v", ct.uuid, err)
}
// We don't mark it as stopped if it's already completed
if res.RowsAffected > 0 {
insertVDiffLog(ct.vde.ctx, dbClient, ct.id, fmt.Sprintf("State changed to: %s (by user request)", StoppedState))
}
return nil
}
func newMigrationSource() *migrationSource {
return &migrationSource{shardStreamer: &shardStreamer{}}
}

Просмотреть файл

@ -259,7 +259,7 @@ func (vde *Engine) getPendingVDiffs(ctx context.Context) (*sqltypes.Result, erro
}
func (vde *Engine) getVDiffsToRetry(ctx context.Context, dbClient binlogplayer.DBClient) (*sqltypes.Result, error) {
qr, err := withDDL.Exec(ctx, sqlGetVDiffsToRetry, dbClient.ExecuteFetch, dbClient.ExecuteFetch)
qr, err := withDDL.ExecIgnore(ctx, sqlGetVDiffsToRetry, dbClient.ExecuteFetch)
if err != nil {
return nil, err
}

Просмотреть файл

@ -83,7 +83,8 @@ const (
sqlNewVDiff = "insert into _vt.vdiff(keyspace, workflow, state, options, shard, db_name, vdiff_uuid) values(%s, %s, '%s', %s, '%s', '%s', '%s')"
sqlResumeVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.options = %s, vd.started_at = NULL, vd.completed_at = NULL, vd.state = 'pending',
vdt.state = 'pending' where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id and vd.state = 'completed' and vdt.state = 'completed'`
vdt.state = 'pending' where vd.vdiff_uuid = %s and vd.id = vdt.vdiff_id and vd.state in ('completed', 'stopped')
and vdt.state in ('completed', 'stopped')`
sqlRetryVDiff = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'pending', vd.last_error = '', vdt.state = 'pending'
where vd.id = %d and vd.id = vdt.vdiff_id and vd.state = 'error' and vdt.state = 'error'`
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %s and workflow = %s and vdiff_uuid = %s"
@ -101,7 +102,9 @@ const (
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vdt.vdiff_id = %d`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %d and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
sqlGetPendingVDiffs = "select * from _vt.vdiff where state = 'pending'"
sqlGetVDiffsToRetry = "select * from _vt.vdiff where state = 'error' and options->>'$.core_options.auto_retry' = 'true'"

Просмотреть файл

@ -36,18 +36,18 @@ type VDiffOutput struct {
Err error
}
func (wr *Wrangler) VDiff2(ctx context.Context, keyspace, workflowName string, command vdiff2.VDiffAction, subCommand, uuid string,
func (wr *Wrangler) VDiff2(ctx context.Context, keyspace, workflowName string, action vdiff2.VDiffAction, actionArg, uuid string,
options *tabletmanagerdata.VDiffOptions) (*VDiffOutput, error) {
log.Infof("VDiff2 called with %s, %s, %s, %s, %s, %+v", keyspace, workflowName, command, subCommand, uuid, options)
log.Infof("VDiff2 called with %s, %s, %s, %s, %s, %+v", keyspace, workflowName, action, actionArg, uuid, options)
req := &tabletmanagerdata.VDiffRequest{
Keyspace: keyspace,
Workflow: workflowName,
Command: string(command),
SubCommand: subCommand,
Options: options,
VdiffUuid: uuid,
Keyspace: keyspace,
Workflow: workflowName,
Action: string(action),
ActionArg: actionArg,
Options: options,
VdiffUuid: uuid,
}
output := &VDiffOutput{
Request: req,
@ -68,7 +68,7 @@ func (wr *Wrangler) VDiff2(ctx context.Context, keyspace, workflowName string, c
return err
})
if output.Err != nil {
log.Errorf("Error in command %s: %v", command, output.Err)
log.Errorf("Error executing action %s: %v", action, output.Err)
return nil, output.Err
}

Просмотреть файл

@ -501,8 +501,8 @@ message VExecResponse {
message VDiffRequest {
string keyspace = 1;
string workflow = 2;
string command = 3;
string sub_command = 4;
string action = 3;
string action_arg = 4;
string vdiff_uuid = 5;
VDiffOptions options = 6;
}

16
web/vtadmin/src/proto/vtadmin.d.ts сгенерированный поставляемый
Просмотреть файл

@ -22268,11 +22268,11 @@ export namespace tabletmanagerdata {
/** VDiffRequest workflow */
workflow?: (string|null);
/** VDiffRequest command */
command?: (string|null);
/** VDiffRequest action */
action?: (string|null);
/** VDiffRequest sub_command */
sub_command?: (string|null);
/** VDiffRequest action_arg */
action_arg?: (string|null);
/** VDiffRequest vdiff_uuid */
vdiff_uuid?: (string|null);
@ -22296,11 +22296,11 @@ export namespace tabletmanagerdata {
/** VDiffRequest workflow. */
public workflow: string;
/** VDiffRequest command. */
public command: string;
/** VDiffRequest action. */
public action: string;
/** VDiffRequest sub_command. */
public sub_command: string;
/** VDiffRequest action_arg. */
public action_arg: string;
/** VDiffRequest vdiff_uuid. */
public vdiff_uuid: string;

60
web/vtadmin/src/proto/vtadmin.js сгенерированный
Просмотреть файл

@ -50623,8 +50623,8 @@ $root.tabletmanagerdata = (function() {
* @interface IVDiffRequest
* @property {string|null} [keyspace] VDiffRequest keyspace
* @property {string|null} [workflow] VDiffRequest workflow
* @property {string|null} [command] VDiffRequest command
* @property {string|null} [sub_command] VDiffRequest sub_command
* @property {string|null} [action] VDiffRequest action
* @property {string|null} [action_arg] VDiffRequest action_arg
* @property {string|null} [vdiff_uuid] VDiffRequest vdiff_uuid
* @property {tabletmanagerdata.IVDiffOptions|null} [options] VDiffRequest options
*/
@ -50661,20 +50661,20 @@ $root.tabletmanagerdata = (function() {
VDiffRequest.prototype.workflow = "";
/**
* VDiffRequest command.
* @member {string} command
* VDiffRequest action.
* @member {string} action
* @memberof tabletmanagerdata.VDiffRequest
* @instance
*/
VDiffRequest.prototype.command = "";
VDiffRequest.prototype.action = "";
/**
* VDiffRequest sub_command.
* @member {string} sub_command
* VDiffRequest action_arg.
* @member {string} action_arg
* @memberof tabletmanagerdata.VDiffRequest
* @instance
*/
VDiffRequest.prototype.sub_command = "";
VDiffRequest.prototype.action_arg = "";
/**
* VDiffRequest vdiff_uuid.
@ -50720,10 +50720,10 @@ $root.tabletmanagerdata = (function() {
writer.uint32(/* id 1, wireType 2 =*/10).string(message.keyspace);
if (message.workflow != null && Object.hasOwnProperty.call(message, "workflow"))
writer.uint32(/* id 2, wireType 2 =*/18).string(message.workflow);
if (message.command != null && Object.hasOwnProperty.call(message, "command"))
writer.uint32(/* id 3, wireType 2 =*/26).string(message.command);
if (message.sub_command != null && Object.hasOwnProperty.call(message, "sub_command"))
writer.uint32(/* id 4, wireType 2 =*/34).string(message.sub_command);
if (message.action != null && Object.hasOwnProperty.call(message, "action"))
writer.uint32(/* id 3, wireType 2 =*/26).string(message.action);
if (message.action_arg != null && Object.hasOwnProperty.call(message, "action_arg"))
writer.uint32(/* id 4, wireType 2 =*/34).string(message.action_arg);
if (message.vdiff_uuid != null && Object.hasOwnProperty.call(message, "vdiff_uuid"))
writer.uint32(/* id 5, wireType 2 =*/42).string(message.vdiff_uuid);
if (message.options != null && Object.hasOwnProperty.call(message, "options"))
@ -50769,10 +50769,10 @@ $root.tabletmanagerdata = (function() {
message.workflow = reader.string();
break;
case 3:
message.command = reader.string();
message.action = reader.string();
break;
case 4:
message.sub_command = reader.string();
message.action_arg = reader.string();
break;
case 5:
message.vdiff_uuid = reader.string();
@ -50821,12 +50821,12 @@ $root.tabletmanagerdata = (function() {
if (message.workflow != null && message.hasOwnProperty("workflow"))
if (!$util.isString(message.workflow))
return "workflow: string expected";
if (message.command != null && message.hasOwnProperty("command"))
if (!$util.isString(message.command))
return "command: string expected";
if (message.sub_command != null && message.hasOwnProperty("sub_command"))
if (!$util.isString(message.sub_command))
return "sub_command: string expected";
if (message.action != null && message.hasOwnProperty("action"))
if (!$util.isString(message.action))
return "action: string expected";
if (message.action_arg != null && message.hasOwnProperty("action_arg"))
if (!$util.isString(message.action_arg))
return "action_arg: string expected";
if (message.vdiff_uuid != null && message.hasOwnProperty("vdiff_uuid"))
if (!$util.isString(message.vdiff_uuid))
return "vdiff_uuid: string expected";
@ -50854,10 +50854,10 @@ $root.tabletmanagerdata = (function() {
message.keyspace = String(object.keyspace);
if (object.workflow != null)
message.workflow = String(object.workflow);
if (object.command != null)
message.command = String(object.command);
if (object.sub_command != null)
message.sub_command = String(object.sub_command);
if (object.action != null)
message.action = String(object.action);
if (object.action_arg != null)
message.action_arg = String(object.action_arg);
if (object.vdiff_uuid != null)
message.vdiff_uuid = String(object.vdiff_uuid);
if (object.options != null) {
@ -50884,8 +50884,8 @@ $root.tabletmanagerdata = (function() {
if (options.defaults) {
object.keyspace = "";
object.workflow = "";
object.command = "";
object.sub_command = "";
object.action = "";
object.action_arg = "";
object.vdiff_uuid = "";
object.options = null;
}
@ -50893,10 +50893,10 @@ $root.tabletmanagerdata = (function() {
object.keyspace = message.keyspace;
if (message.workflow != null && message.hasOwnProperty("workflow"))
object.workflow = message.workflow;
if (message.command != null && message.hasOwnProperty("command"))
object.command = message.command;
if (message.sub_command != null && message.hasOwnProperty("sub_command"))
object.sub_command = message.sub_command;
if (message.action != null && message.hasOwnProperty("action"))
object.action = message.action;
if (message.action_arg != null && message.hasOwnProperty("action_arg"))
object.action_arg = message.action_arg;
if (message.vdiff_uuid != null && message.hasOwnProperty("vdiff_uuid"))
object.vdiff_uuid = message.vdiff_uuid;
if (message.options != null && message.hasOwnProperty("options"))