diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index a1cff2560b..e1aeaa6a76 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -1499,6 +1499,7 @@ type VStreamRequest struct { Target *query.Target `protobuf:"bytes,3,opt,name=target,proto3" json:"target,omitempty"` Position string `protobuf:"bytes,4,opt,name=position,proto3" json:"position,omitempty"` Filter *Filter `protobuf:"bytes,5,opt,name=filter,proto3" json:"filter,omitempty"` + TableLastPKs []*TableLastPK `protobuf:"bytes,6,rep,name=table_last_p_ks,json=tableLastPKs,proto3" json:"table_last_p_ks,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -1564,6 +1565,13 @@ func (m *VStreamRequest) GetFilter() *Filter { return nil } +func (m *VStreamRequest) GetTableLastPKs() []*TableLastPK { + if m != nil { + return m.TableLastPKs + } + return nil +} + // VStreamResponse is the response from VStreamer type VStreamResponse struct { Events []*VEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` @@ -2004,123 +2012,124 @@ func init() { func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_5fd02bcb2e350dad) } var fileDescriptor_5fd02bcb2e350dad = []byte{ - // 1888 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x58, 0x4b, 0x73, 0xdb, 0xc8, - 0x11, 0x36, 0x09, 0x3e, 0x1b, 0x12, 0x05, 0x8d, 0x1e, 0x61, 0x5c, 0xd9, 0x2d, 0x2d, 0x2a, 0x5e, - 0x6b, 0x55, 0x15, 0x6a, 0xc3, 0x64, 0x9d, 0x4b, 0x36, 0x1b, 0x3e, 0x60, 0x99, 0x16, 0x48, 0xca, - 0x43, 0x58, 0xde, 0xda, 0x0b, 0x0a, 0x06, 0x47, 0x12, 0x22, 0x80, 0x80, 0x81, 0xa1, 0xb4, 0xfc, - 0x01, 0xa9, 0xe4, 0x9e, 0x5f, 0x91, 0x53, 0x0e, 0xb9, 0x26, 0xd7, 0xfc, 0x8a, 0x5c, 0x73, 0x49, - 0xfe, 0x44, 0x6a, 0x1e, 0x00, 0x01, 0x79, 0x63, 0xcb, 0x5b, 0x95, 0x43, 0x72, 0x61, 0xcd, 0xf4, - 0x74, 0xf7, 0xf4, 0xeb, 0xeb, 0x69, 0x10, 0xb4, 0xd7, 0xde, 0xc2, 0x0f, 0x2f, 0xe7, 0x0e, 0x75, - 0x3a, 0x51, 0x1c, 0xd2, 0x10, 0xc1, 0x9a, 0xf2, 0x50, 0xbd, 0xa1, 0x71, 0xe4, 0x8a, 0x83, 0x87, - 0xea, 0x9b, 0x25, 0x89, 0x57, 0x72, 0xd3, 0xa2, 0x61, 0x14, 0xae, 0xa5, 0xf4, 0x31, 0xd4, 0x07, - 0x57, 0x4e, 0x9c, 0x10, 0x8a, 0xf6, 0xa1, 0xe6, 0xfa, 0x1e, 0x59, 0xd0, 0x76, 0xe9, 0xa0, 0x74, - 0x58, 0xc5, 0x72, 0x87, 0x10, 0x54, 0xdc, 0x70, 0xb1, 0x68, 0x97, 0x39, 0x95, 0xaf, 0x19, 0x6f, - 0x42, 0xe2, 0x1b, 0x12, 0xb7, 0x15, 0xc1, 0x2b, 0x76, 0xfa, 0x3f, 0x14, 0xd8, 0xee, 0x73, 0x3b, - 0xac, 0xd8, 0x59, 0x24, 0x8e, 0x4b, 0xbd, 0x70, 0x81, 0x4e, 0x00, 0x12, 0xea, 0x50, 0x12, 0x90, - 0x05, 0x4d, 0xda, 0xa5, 0x03, 0xe5, 0x50, 0xed, 0x3e, 0xee, 0xe4, 0x3c, 0x78, 0x4b, 0xa4, 0x33, - 0x4b, 0xf9, 0x71, 0x4e, 0x14, 0x75, 0x41, 0x25, 0x37, 0x64, 0x41, 0x6d, 0x1a, 0x5e, 0x93, 0x45, - 0xbb, 0x72, 0x50, 0x3a, 0x54, 0xbb, 0xdb, 0x1d, 0xe1, 0xa0, 0xc1, 0x4e, 0x2c, 0x76, 0x80, 0x81, - 0x64, 0xeb, 0x87, 0x7f, 0x2b, 0x43, 0x33, 0xd3, 0x86, 0x4c, 0x68, 0xb8, 0x0e, 0x25, 0x97, 0x61, - 0xbc, 0xe2, 0x6e, 0xb6, 0xba, 0x9f, 0xdf, 0xd3, 0x90, 0xce, 0x40, 0xca, 0xe1, 0x4c, 0x03, 0xfa, - 0x09, 0xd4, 0x5d, 0x11, 0x3d, 0x1e, 0x1d, 0xb5, 0xbb, 0x93, 0x57, 0x26, 0x03, 0x8b, 0x53, 0x1e, - 0xa4, 0x81, 0x92, 0xbc, 0xf1, 0x79, 0xc8, 0x36, 0x30, 0x5b, 0xea, 0x7f, 0x2c, 0x41, 0x23, 0xd5, - 0x8b, 0x76, 0x60, 0xab, 0x6f, 0xda, 0x2f, 0x27, 0xd8, 0x18, 0x4c, 0x4f, 0x26, 0xa3, 0x6f, 0x8c, - 0xa1, 0xf6, 0x00, 0x6d, 0x40, 0xa3, 0x6f, 0xda, 0x7d, 0xe3, 0x64, 0x34, 0xd1, 0x4a, 0x68, 0x13, - 0x9a, 0x7d, 0xd3, 0x1e, 0x4c, 0xc7, 0xe3, 0x91, 0xa5, 0x95, 0xd1, 0x16, 0xa8, 0x7d, 0xd3, 0xc6, - 0x53, 0xd3, 0xec, 0xf7, 0x06, 0xa7, 0x9a, 0x82, 0xf6, 0x60, 0xbb, 0x6f, 0xda, 0xc3, 0xb1, 0x69, - 0x0f, 0x8d, 0x33, 0x6c, 0x0c, 0x7a, 0x96, 0x31, 0xd4, 0x2a, 0x08, 0xa0, 0xc6, 0xc8, 0x43, 0x53, - 0xab, 0xca, 0xf5, 0xcc, 0xb0, 0xb4, 0x9a, 0x54, 0x37, 0x9a, 0xcc, 0x0c, 0x6c, 0x69, 0x75, 0xb9, - 0x7d, 0x79, 0x36, 0xec, 0x59, 0x86, 0xd6, 0x90, 0xdb, 0xa1, 0x61, 0x1a, 0x96, 0xa1, 0x35, 0x9f, - 0x57, 0x1a, 0x65, 0x4d, 0x79, 0x5e, 0x69, 0x28, 0x5a, 0x45, 0xff, 0x43, 0x09, 0xf6, 0x66, 0x34, - 0x26, 0x4e, 0x70, 0x4a, 0x56, 0xd8, 0x59, 0x5c, 0x12, 0x4c, 0xde, 0x2c, 0x49, 0x42, 0xd1, 0x43, - 0x68, 0x44, 0x61, 0xe2, 0xb1, 0xd8, 0xf1, 0x00, 0x37, 0x71, 0xb6, 0x47, 0xc7, 0xd0, 0xbc, 0x26, - 0x2b, 0x3b, 0x66, 0xfc, 0x32, 0x60, 0xa8, 0x93, 0x15, 0x64, 0xa6, 0xa9, 0x71, 0x2d, 0x57, 0xf9, - 0xf8, 0x2a, 0xef, 0x8f, 0xaf, 0x7e, 0x01, 0xfb, 0x77, 0x8d, 0x4a, 0xa2, 0x70, 0x91, 0x10, 0x64, - 0x02, 0x12, 0x82, 0x36, 0x5d, 0xe7, 0x96, 0xdb, 0xa7, 0x76, 0x3f, 0x7a, 0x67, 0x01, 0xe0, 0xed, - 0xd7, 0x77, 0x49, 0xfa, 0xb7, 0xb0, 0x23, 0xee, 0xb1, 0x9c, 0xd7, 0x3e, 0x49, 0xee, 0xe3, 0xfa, - 0x3e, 0xd4, 0x28, 0x67, 0x6e, 0x97, 0x0f, 0x94, 0xc3, 0x26, 0x96, 0xbb, 0x0f, 0xf5, 0x70, 0x0e, - 0xbb, 0xc5, 0x9b, 0xff, 0x2b, 0xfe, 0xfd, 0x1c, 0x2a, 0x78, 0xe9, 0x13, 0xb4, 0x0b, 0xd5, 0xc0, - 0xa1, 0xee, 0x95, 0xf4, 0x46, 0x6c, 0x98, 0x2b, 0x17, 0x9e, 0x4f, 0x49, 0xcc, 0x53, 0xd8, 0xc4, - 0x72, 0xa7, 0xff, 0xb9, 0x04, 0xb5, 0xa7, 0x7c, 0x89, 0x3e, 0x85, 0x6a, 0xbc, 0x64, 0xce, 0x0a, - 0xac, 0x6b, 0x79, 0x0b, 0x98, 0x66, 0x2c, 0x8e, 0xd1, 0x08, 0x5a, 0x17, 0x1e, 0xf1, 0xe7, 0x1c, - 0xba, 0xe3, 0x70, 0x2e, 0xaa, 0xa2, 0xd5, 0xfd, 0x24, 0x2f, 0x20, 0x74, 0x76, 0x9e, 0x16, 0x18, - 0xf1, 0x1d, 0x41, 0xfd, 0x09, 0xb4, 0x8a, 0x1c, 0x0c, 0x4e, 0x06, 0xc6, 0xf6, 0x74, 0x62, 0x8f, - 0x47, 0xb3, 0x71, 0xcf, 0x1a, 0x3c, 0xd3, 0x1e, 0x70, 0xc4, 0x18, 0x33, 0xcb, 0x36, 0x9e, 0x3e, - 0x9d, 0x62, 0x4b, 0x2b, 0xe9, 0xff, 0x2a, 0xc3, 0x86, 0x08, 0xca, 0x2c, 0x5c, 0xc6, 0x2e, 0x61, - 0x59, 0xbc, 0x26, 0xab, 0x24, 0x72, 0x5c, 0x92, 0x66, 0x31, 0xdd, 0xb3, 0x80, 0x24, 0x57, 0x4e, - 0x3c, 0x97, 0x9e, 0x8b, 0x0d, 0xfa, 0x02, 0x54, 0x9e, 0x4d, 0x6a, 0xd3, 0x55, 0x44, 0x78, 0x1e, - 0x5b, 0xdd, 0xdd, 0x75, 0x61, 0xf3, 0x5c, 0x51, 0x6b, 0x15, 0x11, 0x0c, 0x34, 0x5b, 0x17, 0xd1, - 0x50, 0xb9, 0x07, 0x1a, 0xd6, 0x35, 0x54, 0x2d, 0xd4, 0xd0, 0x51, 0x96, 0x90, 0x9a, 0xd4, 0xf2, - 0x56, 0xf4, 0xd2, 0x24, 0xa1, 0x0e, 0xd4, 0xc2, 0x85, 0x3d, 0x9f, 0xfb, 0xed, 0x3a, 0x37, 0xf3, - 0x07, 0x79, 0xde, 0xe9, 0x62, 0x38, 0x34, 0x7b, 0xa2, 0x2c, 0xaa, 0xe1, 0x62, 0x38, 0xf7, 0xd1, - 0x23, 0x68, 0x91, 0x6f, 0x29, 0x89, 0x17, 0x8e, 0x6f, 0x07, 0x2b, 0xd6, 0xbd, 0x1a, 0xdc, 0xf5, - 0xcd, 0x94, 0x3a, 0x66, 0x44, 0xf4, 0x29, 0x6c, 0x25, 0x34, 0x8c, 0x6c, 0xe7, 0x82, 0x92, 0xd8, - 0x76, 0xc3, 0x68, 0xd5, 0x6e, 0x1e, 0x94, 0x0e, 0x1b, 0x78, 0x93, 0x91, 0x7b, 0x8c, 0x3a, 0x08, - 0xa3, 0x95, 0xfe, 0x02, 0x9a, 0x38, 0xbc, 0x1d, 0x5c, 0x71, 0x7f, 0x74, 0xa8, 0xbd, 0x26, 0x17, - 0x61, 0x4c, 0x64, 0xa1, 0x82, 0x6c, 0xe4, 0x38, 0xbc, 0xc5, 0xf2, 0x04, 0x1d, 0x40, 0x95, 0xeb, - 0x94, 0xed, 0x22, 0xcf, 0x22, 0x0e, 0x74, 0x07, 0x1a, 0x38, 0xbc, 0xe5, 0x69, 0x47, 0x1f, 0x81, - 0x08, 0xb0, 0xbd, 0x70, 0x82, 0x34, 0x7b, 0x4d, 0x4e, 0x99, 0x38, 0x01, 0x41, 0x4f, 0x40, 0x8d, - 0xc3, 0x5b, 0xdb, 0xe5, 0xd7, 0x0b, 0x24, 0xaa, 0xdd, 0xbd, 0x42, 0x71, 0xa6, 0xc6, 0x61, 0x88, - 0xd3, 0x65, 0xa2, 0xbf, 0x00, 0x58, 0xd7, 0xd6, 0xfb, 0x2e, 0xf9, 0x31, 0xcb, 0x06, 0xf1, 0xe7, - 0xa9, 0xfe, 0x0d, 0x69, 0x32, 0xd7, 0x80, 0xe5, 0x99, 0xfe, 0xfb, 0x12, 0x34, 0x67, 0xac, 0x7a, - 0x4e, 0xa8, 0x37, 0xff, 0x1e, 0x35, 0x87, 0xa0, 0x72, 0x49, 0xbd, 0x39, 0x2f, 0xb6, 0x26, 0xe6, - 0x6b, 0xf4, 0x45, 0x6a, 0x58, 0x64, 0x5f, 0x27, 0xed, 0x0a, 0xbf, 0xbd, 0x90, 0x5f, 0x5e, 0x88, - 0xa6, 0x93, 0xd0, 0xb3, 0x53, 0xdc, 0xe0, 0xac, 0x67, 0xa7, 0x89, 0xfe, 0x15, 0x54, 0xcf, 0xb9, - 0x15, 0x4f, 0x40, 0xe5, 0xca, 0x6d, 0xa6, 0x2d, 0xc5, 0x6e, 0x21, 0x3c, 0x99, 0xc5, 0x18, 0x92, - 0x74, 0x99, 0xe8, 0x3d, 0xd8, 0x3c, 0x95, 0xd6, 0x72, 0x86, 0x0f, 0x77, 0x47, 0xff, 0x4b, 0x19, - 0xea, 0xcf, 0xc3, 0x25, 0x2b, 0x28, 0xd4, 0x82, 0xb2, 0x37, 0xe7, 0x72, 0x0a, 0x2e, 0x7b, 0x73, - 0xf4, 0x6b, 0x68, 0x05, 0xde, 0x65, 0xec, 0xb0, 0xb2, 0x14, 0x08, 0x13, 0x4d, 0xe2, 0x87, 0x79, - 0xcb, 0xc6, 0x29, 0x07, 0x87, 0xd9, 0x66, 0x90, 0xdf, 0xe6, 0x80, 0xa3, 0x14, 0x80, 0xf3, 0x08, - 0x5a, 0x7e, 0xe8, 0x3a, 0xbe, 0x9d, 0xb5, 0xed, 0x8a, 0x28, 0x6e, 0x4e, 0x3d, 0x4b, 0x7b, 0xf7, - 0x9d, 0xb8, 0x54, 0xef, 0x19, 0x17, 0xf4, 0x25, 0x6c, 0x44, 0x4e, 0x4c, 0x3d, 0xd7, 0x8b, 0x1c, - 0x36, 0xf8, 0xd4, 0xb8, 0x60, 0xc1, 0xec, 0x42, 0xdc, 0x70, 0x81, 0x1d, 0x7d, 0x06, 0x5a, 0xc2, - 0x5b, 0x92, 0x7d, 0x1b, 0xc6, 0xd7, 0x17, 0x7e, 0x78, 0x9b, 0xb4, 0xeb, 0xdc, 0xfe, 0x2d, 0x41, - 0x7f, 0x95, 0x92, 0xf5, 0x3f, 0x29, 0x50, 0x3b, 0x17, 0xd5, 0x79, 0x04, 0x15, 0x1e, 0x23, 0x31, - 0xdc, 0xec, 0xe7, 0x2f, 0x13, 0x1c, 0x3c, 0x40, 0x9c, 0x07, 0xfd, 0x08, 0x9a, 0xd4, 0x0b, 0x48, - 0x42, 0x9d, 0x20, 0xe2, 0x41, 0x55, 0xf0, 0x9a, 0xf0, 0x9d, 0x25, 0xa6, 0x81, 0xc2, 0x7a, 0x87, - 0x08, 0x13, 0x5b, 0xa2, 0x9f, 0x42, 0x93, 0x61, 0x8a, 0x0f, 0x5c, 0xed, 0x2a, 0x07, 0xe9, 0xee, - 0x1d, 0x44, 0xf1, 0x6b, 0x71, 0x23, 0x4e, 0x51, 0xfa, 0x0b, 0x50, 0x39, 0x0a, 0xa4, 0x90, 0x68, - 0x5a, 0xfb, 0xc5, 0xa6, 0x95, 0xa2, 0x0d, 0xc3, 0xba, 0xcf, 0xa3, 0xc7, 0x50, 0xbd, 0xe1, 0x26, - 0xd5, 0xe5, 0xe0, 0x97, 0x77, 0x8e, 0x87, 0x5f, 0x9c, 0xb3, 0x57, 0xf5, 0x37, 0xa2, 0x9a, 0x78, - 0xbb, 0xba, 0xf3, 0xaa, 0xca, 0x42, 0xc3, 0x29, 0x0f, 0xf7, 0x2a, 0xf0, 0x79, 0xc7, 0x62, 0x5e, - 0x05, 0x3e, 0xfa, 0x04, 0x36, 0xdc, 0x65, 0x1c, 0xf3, 0x51, 0xd3, 0x0b, 0x48, 0x7b, 0x97, 0x07, - 0x47, 0x95, 0x34, 0xcb, 0x0b, 0x08, 0xfa, 0x25, 0xb4, 0x7c, 0x27, 0xa1, 0x0c, 0x6c, 0xd2, 0x91, - 0x3d, 0x7e, 0x55, 0x01, 0x71, 0x02, 0x6c, 0xc2, 0x13, 0xd5, 0x5f, 0x6f, 0xf4, 0x2b, 0xd8, 0x18, - 0x7b, 0x0b, 0x2f, 0x70, 0x7c, 0x0e, 0x4a, 0x16, 0xec, 0x5c, 0x3b, 0xe1, 0xeb, 0xfb, 0x75, 0x12, - 0xf4, 0x31, 0xa8, 0xcc, 0x04, 0x37, 0xf4, 0x97, 0xc1, 0x42, 0x54, 0xb8, 0x82, 0x9b, 0xd1, 0xe9, - 0x40, 0x10, 0x18, 0x3a, 0xe5, 0x4d, 0x33, 0xf7, 0x8a, 0x04, 0x0e, 0xfa, 0x3c, 0x43, 0x83, 0x40, - 0x78, 0xbb, 0x88, 0xa3, 0xb5, 0x51, 0x29, 0x4e, 0xf4, 0xdf, 0x95, 0xa1, 0x75, 0x2e, 0xe6, 0x8e, - 0x74, 0xd6, 0xf9, 0x0a, 0x76, 0xc8, 0xc5, 0x05, 0x71, 0xa9, 0x77, 0x43, 0x6c, 0xd7, 0xf1, 0x7d, - 0x12, 0xdb, 0x12, 0xb5, 0x6a, 0x77, 0xab, 0x23, 0xbe, 0x3f, 0x06, 0x9c, 0x3e, 0x1a, 0xe2, 0xed, - 0x8c, 0x57, 0x92, 0xe6, 0xc8, 0x80, 0x1d, 0x2f, 0x08, 0xc8, 0xdc, 0x73, 0x68, 0x5e, 0x81, 0x68, - 0xf3, 0x7b, 0xd2, 0xd3, 0x73, 0xeb, 0xc4, 0xa1, 0x64, 0xad, 0x26, 0x93, 0xc8, 0xd4, 0x3c, 0x62, - 0xce, 0xc4, 0x97, 0xd9, 0xf8, 0xb4, 0x29, 0x25, 0x2d, 0x4e, 0xc4, 0xf2, 0xb0, 0x30, 0x9a, 0x55, - 0xee, 0x8c, 0x66, 0xeb, 0xe7, 0xb3, 0xfa, 0xbe, 0xe7, 0x53, 0xff, 0x12, 0xb6, 0xb2, 0x40, 0xc8, - 0xd1, 0xeb, 0x08, 0x6a, 0x3c, 0xfd, 0x69, 0x38, 0xd1, 0xdb, 0x90, 0xc3, 0x92, 0x43, 0xff, 0x6d, - 0x19, 0x50, 0x2a, 0x1f, 0xde, 0x26, 0xff, 0xa3, 0xc1, 0xdc, 0x85, 0x2a, 0xa7, 0xcb, 0x48, 0x8a, - 0x0d, 0x8b, 0x03, 0x2b, 0xf0, 0xe8, 0x3a, 0x0b, 0xa3, 0x10, 0x7e, 0xc1, 0x7e, 0x31, 0x49, 0x96, - 0x3e, 0xc5, 0x92, 0x43, 0xff, 0x6b, 0x09, 0x76, 0x0a, 0x71, 0x90, 0xb1, 0x5c, 0x57, 0x7c, 0xe9, - 0x1d, 0x15, 0x7f, 0x08, 0x8d, 0xe8, 0xfa, 0x1d, 0xc8, 0xc8, 0x4e, 0xbf, 0xb3, 0x85, 0x7d, 0x0c, - 0x95, 0x98, 0xb5, 0x52, 0xf1, 0x3e, 0xe6, 0x07, 0x0a, 0x4e, 0x67, 0x53, 0x49, 0xc1, 0x8f, 0xc2, - 0x54, 0x22, 0xed, 0xf7, 0x40, 0xcd, 0x21, 0x9b, 0xb5, 0x02, 0xf1, 0xf0, 0xa6, 0x0d, 0x41, 0xa6, - 0xee, 0x3f, 0x3e, 0xbe, 0x62, 0x5e, 0x14, 0x1b, 0xd6, 0x85, 0xdd, 0x30, 0x88, 0x7c, 0x42, 0x89, - 0x48, 0x59, 0x03, 0xaf, 0x09, 0xfa, 0xd7, 0xa0, 0xe6, 0x24, 0xdf, 0x37, 0x7c, 0xac, 0x93, 0xa0, - 0xbc, 0x37, 0x09, 0x7f, 0x2f, 0xc1, 0xde, 0xba, 0x98, 0x97, 0x3e, 0xfd, 0xbf, 0xaa, 0x47, 0x3d, - 0x86, 0xfd, 0xbb, 0xde, 0x7d, 0x50, 0x95, 0x7d, 0x8f, 0xda, 0x39, 0xfa, 0x15, 0xa8, 0xb9, 0x19, - 0x9a, 0x7d, 0x6a, 0x8f, 0x4e, 0x26, 0x53, 0x6c, 0x68, 0x0f, 0x50, 0x03, 0x2a, 0x33, 0x6b, 0x7a, - 0xa6, 0x95, 0xd8, 0xca, 0xf8, 0xda, 0x18, 0x88, 0xcf, 0x77, 0xb6, 0xb2, 0x25, 0x93, 0x72, 0xf4, - 0xcf, 0x12, 0xc0, 0xfa, 0x95, 0x46, 0x2a, 0xd4, 0x5f, 0x4e, 0x4e, 0x27, 0xd3, 0x57, 0x13, 0xa1, - 0xe0, 0xc4, 0x1a, 0x0d, 0xb5, 0x12, 0x6a, 0x42, 0x55, 0xfc, 0x1f, 0x50, 0x66, 0x37, 0xc8, 0x3f, - 0x03, 0x14, 0xb4, 0x01, 0x8d, 0xec, 0x9f, 0x80, 0x0a, 0xaa, 0x83, 0x92, 0x7d, 0xef, 0xcb, 0x0f, - 0xfc, 0x1a, 0x53, 0x88, 0x8d, 0x33, 0xb3, 0x37, 0x30, 0xb4, 0x3a, 0x3b, 0xc8, 0x3e, 0xf5, 0x01, - 0x6a, 0xe9, 0x77, 0x3e, 0x93, 0x9c, 0x19, 0x96, 0x06, 0xec, 0x9e, 0xa9, 0xf5, 0xcc, 0xc0, 0x9a, - 0xca, 0x68, 0x78, 0xfa, 0x4a, 0xdb, 0x60, 0xb4, 0xa7, 0x23, 0xc3, 0x1c, 0x6a, 0x9b, 0x68, 0x13, - 0x9a, 0xcf, 0x8c, 0x1e, 0xb6, 0xfa, 0x46, 0xcf, 0xd2, 0x5a, 0xec, 0xe4, 0x9c, 0x1b, 0xb8, 0xc5, - 0xae, 0x79, 0x3e, 0x7d, 0x89, 0x27, 0x3d, 0x53, 0xd3, 0xd8, 0xe6, 0xdc, 0xc0, 0xb3, 0xd1, 0x74, - 0xa2, 0x6d, 0xb3, 0x7b, 0xcc, 0xde, 0xcc, 0x3a, 0x3b, 0xd5, 0xd0, 0xd1, 0x63, 0xf6, 0x30, 0xe5, - 0xc7, 0x34, 0x80, 0x9a, 0xd5, 0xeb, 0x9b, 0xc6, 0x4c, 0x7b, 0xc0, 0xd6, 0xb3, 0x67, 0x3d, 0x3c, - 0x9c, 0x69, 0xa5, 0xfe, 0x67, 0xdf, 0x3c, 0xbe, 0xf1, 0x28, 0x49, 0x92, 0x8e, 0x17, 0x1e, 0x8b, - 0xd5, 0xf1, 0x65, 0x78, 0x7c, 0x43, 0x8f, 0xf9, 0x7f, 0x58, 0xc7, 0x6b, 0x90, 0xbd, 0xae, 0x71, - 0xca, 0xcf, 0xfe, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x74, 0x10, 0xf9, 0x24, 0x1f, 0x13, 0x00, 0x00, + // 1901 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x58, 0x4b, 0x73, 0xe3, 0xc6, + 0x11, 0x5e, 0xbe, 0xc9, 0x06, 0x45, 0x41, 0xa3, 0x47, 0x98, 0xad, 0xd8, 0x25, 0xa3, 0x62, 0xaf, + 0xac, 0xaa, 0x50, 0x0e, 0x13, 0x6f, 0x2e, 0xb1, 0x1d, 0x3e, 0xb0, 0x5a, 0xae, 0xf8, 0xd0, 0x0e, + 0xb1, 0x5a, 0x97, 0x2f, 0x28, 0x08, 0x1c, 0x49, 0x88, 0x00, 0x02, 0x0b, 0x0c, 0x25, 0xf3, 0x07, + 0xa4, 0x2a, 0xf7, 0xfc, 0x8a, 0x9c, 0x72, 0xc8, 0x35, 0xb9, 0x26, 0x7f, 0x22, 0xd7, 0x5c, 0x92, + 0x3f, 0x91, 0x9a, 0x07, 0x5e, 0x5a, 0x7b, 0xa5, 0x75, 0x55, 0x0e, 0xc9, 0x85, 0x35, 0xd3, 0xd3, + 0xdd, 0xd3, 0xaf, 0xaf, 0xd1, 0x1c, 0x50, 0xcf, 0x9d, 0xa5, 0xeb, 0x5f, 0x2e, 0x2c, 0x6a, 0x75, + 0x82, 0xd0, 0xa7, 0x3e, 0x82, 0x94, 0xf2, 0x58, 0xb9, 0xa1, 0x61, 0x60, 0x8b, 0x83, 0xc7, 0xca, + 0x9b, 0x15, 0x09, 0xd7, 0x72, 0xd3, 0xa2, 0x7e, 0xe0, 0xa7, 0x52, 0xda, 0x04, 0x6a, 0x83, 0x2b, + 0x2b, 0x8c, 0x08, 0x45, 0x7b, 0x50, 0xb5, 0x5d, 0x87, 0x2c, 0x69, 0xbb, 0xb0, 0x5f, 0x38, 0xa8, + 0x60, 0xb9, 0x43, 0x08, 0xca, 0xb6, 0xbf, 0x5c, 0xb6, 0x8b, 0x9c, 0xca, 0xd7, 0x8c, 0x37, 0x22, + 0xe1, 0x0d, 0x09, 0xdb, 0x25, 0xc1, 0x2b, 0x76, 0xda, 0x3f, 0x4b, 0xb0, 0xd5, 0xe7, 0x76, 0x18, + 0xa1, 0xb5, 0x8c, 0x2c, 0x9b, 0x3a, 0xfe, 0x12, 0x1d, 0x03, 0x44, 0xd4, 0xa2, 0xc4, 0x23, 0x4b, + 0x1a, 0xb5, 0x0b, 0xfb, 0xa5, 0x03, 0xa5, 0xfb, 0xa4, 0x93, 0xf1, 0xe0, 0x2d, 0x91, 0xce, 0x3c, + 0xe6, 0xc7, 0x19, 0x51, 0xd4, 0x05, 0x85, 0xdc, 0x90, 0x25, 0x35, 0xa9, 0x7f, 0x4d, 0x96, 0xed, + 0xf2, 0x7e, 0xe1, 0x40, 0xe9, 0x6e, 0x75, 0x84, 0x83, 0x3a, 0x3b, 0x31, 0xd8, 0x01, 0x06, 0x92, + 0xac, 0x1f, 0xff, 0xad, 0x08, 0x8d, 0x44, 0x1b, 0x1a, 0x43, 0xdd, 0xb6, 0x28, 0xb9, 0xf4, 0xc3, + 0x35, 0x77, 0xb3, 0xd5, 0xfd, 0xec, 0x81, 0x86, 0x74, 0x06, 0x52, 0x0e, 0x27, 0x1a, 0xd0, 0xcf, + 0xa0, 0x66, 0x8b, 0xe8, 0xf1, 0xe8, 0x28, 0xdd, 0xed, 0xac, 0x32, 0x19, 0x58, 0x1c, 0xf3, 0x20, + 0x15, 0x4a, 0xd1, 0x1b, 0x97, 0x87, 0xac, 0x89, 0xd9, 0x52, 0xfb, 0x63, 0x01, 0xea, 0xb1, 0x5e, + 0xb4, 0x0d, 0x9b, 0xfd, 0xb1, 0xf9, 0x6a, 0x8a, 0xf5, 0xc1, 0xec, 0x78, 0x3a, 0xfa, 0x46, 0x1f, + 0xaa, 0x8f, 0x50, 0x13, 0xea, 0xfd, 0xb1, 0xd9, 0xd7, 0x8f, 0x47, 0x53, 0xb5, 0x80, 0x36, 0xa0, + 0xd1, 0x1f, 0x9b, 0x83, 0xd9, 0x64, 0x32, 0x32, 0xd4, 0x22, 0xda, 0x04, 0xa5, 0x3f, 0x36, 0xf1, + 0x6c, 0x3c, 0xee, 0xf7, 0x06, 0x27, 0x6a, 0x09, 0xed, 0xc2, 0x56, 0x7f, 0x6c, 0x0e, 0x27, 0x63, + 0x73, 0xa8, 0x9f, 0x62, 0x7d, 0xd0, 0x33, 0xf4, 0xa1, 0x5a, 0x46, 0x00, 0x55, 0x46, 0x1e, 0x8e, + 0xd5, 0x8a, 0x5c, 0xcf, 0x75, 0x43, 0xad, 0x4a, 0x75, 0xa3, 0xe9, 0x5c, 0xc7, 0x86, 0x5a, 0x93, + 0xdb, 0x57, 0xa7, 0xc3, 0x9e, 0xa1, 0xab, 0x75, 0xb9, 0x1d, 0xea, 0x63, 0xdd, 0xd0, 0xd5, 0xc6, + 0x8b, 0x72, 0xbd, 0xa8, 0x96, 0x5e, 0x94, 0xeb, 0x25, 0xb5, 0xac, 0xfd, 0xa1, 0x00, 0xbb, 0x73, + 0x1a, 0x12, 0xcb, 0x3b, 0x21, 0x6b, 0x6c, 0x2d, 0x2f, 0x09, 0x26, 0x6f, 0x56, 0x24, 0xa2, 0xe8, + 0x31, 0xd4, 0x03, 0x3f, 0x72, 0x58, 0xec, 0x78, 0x80, 0x1b, 0x38, 0xd9, 0xa3, 0x23, 0x68, 0x5c, + 0x93, 0xb5, 0x19, 0x32, 0x7e, 0x19, 0x30, 0xd4, 0x49, 0x0a, 0x32, 0xd1, 0x54, 0xbf, 0x96, 0xab, + 0x6c, 0x7c, 0x4b, 0xf7, 0xc7, 0x57, 0xbb, 0x80, 0xbd, 0xbb, 0x46, 0x45, 0x81, 0xbf, 0x8c, 0x08, + 0x1a, 0x03, 0x12, 0x82, 0x26, 0x4d, 0x73, 0xcb, 0xed, 0x53, 0xba, 0x1f, 0xbc, 0xb3, 0x00, 0xf0, + 0xd6, 0xf9, 0x5d, 0x92, 0xf6, 0x2d, 0x6c, 0x8b, 0x7b, 0x0c, 0xeb, 0xdc, 0x25, 0xd1, 0x43, 0x5c, + 0xdf, 0x83, 0x2a, 0xe5, 0xcc, 0xed, 0xe2, 0x7e, 0xe9, 0xa0, 0x81, 0xe5, 0xee, 0x7d, 0x3d, 0x5c, + 0xc0, 0x4e, 0xfe, 0xe6, 0xff, 0x8a, 0x7f, 0xbf, 0x84, 0x32, 0x5e, 0xb9, 0x04, 0xed, 0x40, 0xc5, + 0xb3, 0xa8, 0x7d, 0x25, 0xbd, 0x11, 0x1b, 0xe6, 0xca, 0x85, 0xe3, 0x52, 0x12, 0xf2, 0x14, 0x36, + 0xb0, 0xdc, 0x69, 0x7f, 0x2e, 0x40, 0xf5, 0x19, 0x5f, 0xa2, 0x4f, 0xa0, 0x12, 0xae, 0x98, 0xb3, + 0x02, 0xeb, 0x6a, 0xd6, 0x02, 0xa6, 0x19, 0x8b, 0x63, 0x34, 0x82, 0xd6, 0x85, 0x43, 0xdc, 0x05, + 0x87, 0xee, 0xc4, 0x5f, 0x88, 0xaa, 0x68, 0x75, 0x3f, 0xca, 0x0a, 0x08, 0x9d, 0x9d, 0x67, 0x39, + 0x46, 0x7c, 0x47, 0x50, 0x7b, 0x0a, 0xad, 0x3c, 0x07, 0x83, 0x93, 0x8e, 0xb1, 0x39, 0x9b, 0x9a, + 0x93, 0xd1, 0x7c, 0xd2, 0x33, 0x06, 0xcf, 0xd5, 0x47, 0x1c, 0x31, 0xfa, 0xdc, 0x30, 0xf5, 0x67, + 0xcf, 0x66, 0xd8, 0x50, 0x0b, 0xda, 0xbf, 0x8b, 0xd0, 0x14, 0x41, 0x99, 0xfb, 0xab, 0xd0, 0x26, + 0x2c, 0x8b, 0xd7, 0x64, 0x1d, 0x05, 0x96, 0x4d, 0xe2, 0x2c, 0xc6, 0x7b, 0x16, 0x90, 0xe8, 0xca, + 0x0a, 0x17, 0xd2, 0x73, 0xb1, 0x41, 0x9f, 0x83, 0xc2, 0xb3, 0x49, 0x4d, 0xba, 0x0e, 0x08, 0xcf, + 0x63, 0xab, 0xbb, 0x93, 0x16, 0x36, 0xcf, 0x15, 0x35, 0xd6, 0x01, 0xc1, 0x40, 0x93, 0x75, 0x1e, + 0x0d, 0xe5, 0x07, 0xa0, 0x21, 0xad, 0xa1, 0x4a, 0xae, 0x86, 0x0e, 0x93, 0x84, 0x54, 0xa5, 0x96, + 0xb7, 0xa2, 0x17, 0x27, 0x09, 0x75, 0xa0, 0xea, 0x2f, 0xcd, 0xc5, 0xc2, 0x6d, 0xd7, 0xb8, 0x99, + 0x3f, 0xca, 0xf2, 0xce, 0x96, 0xc3, 0xe1, 0xb8, 0x27, 0xca, 0xa2, 0xe2, 0x2f, 0x87, 0x0b, 0x17, + 0x7d, 0x0c, 0x2d, 0xf2, 0x2d, 0x25, 0xe1, 0xd2, 0x72, 0x4d, 0x6f, 0xcd, 0xba, 0x57, 0x9d, 0xbb, + 0xbe, 0x11, 0x53, 0x27, 0x8c, 0x88, 0x3e, 0x81, 0xcd, 0x88, 0xfa, 0x81, 0x69, 0x5d, 0x50, 0x12, + 0x9a, 0xb6, 0x1f, 0xac, 0xdb, 0x8d, 0xfd, 0xc2, 0x41, 0x1d, 0x6f, 0x30, 0x72, 0x8f, 0x51, 0x07, + 0x7e, 0xb0, 0xd6, 0x5e, 0x42, 0x03, 0xfb, 0xb7, 0x83, 0x2b, 0xee, 0x8f, 0x06, 0xd5, 0x73, 0x72, + 0xe1, 0x87, 0x44, 0x16, 0x2a, 0xc8, 0x46, 0x8e, 0xfd, 0x5b, 0x2c, 0x4f, 0xd0, 0x3e, 0x54, 0xb8, + 0x4e, 0xd9, 0x2e, 0xb2, 0x2c, 0xe2, 0x40, 0xb3, 0xa0, 0x8e, 0xfd, 0x5b, 0x9e, 0x76, 0xf4, 0x01, + 0x88, 0x00, 0x9b, 0x4b, 0xcb, 0x8b, 0xb3, 0xd7, 0xe0, 0x94, 0xa9, 0xe5, 0x11, 0xf4, 0x14, 0x94, + 0xd0, 0xbf, 0x35, 0x6d, 0x7e, 0xbd, 0x40, 0xa2, 0xd2, 0xdd, 0xcd, 0x15, 0x67, 0x6c, 0x1c, 0x86, + 0x30, 0x5e, 0x46, 0xda, 0x4b, 0x80, 0xb4, 0xb6, 0xee, 0xbb, 0xe4, 0xa7, 0x2c, 0x1b, 0xc4, 0x5d, + 0xc4, 0xfa, 0x9b, 0xd2, 0x64, 0xae, 0x01, 0xcb, 0x33, 0xed, 0xf7, 0x05, 0x68, 0xcc, 0x59, 0xf5, + 0x1c, 0x53, 0x67, 0xf1, 0x03, 0x6a, 0x0e, 0x41, 0xf9, 0x92, 0x3a, 0x0b, 0x5e, 0x6c, 0x0d, 0xcc, + 0xd7, 0xe8, 0xf3, 0xd8, 0xb0, 0xc0, 0xbc, 0x8e, 0xda, 0x65, 0x7e, 0x7b, 0x2e, 0xbf, 0xbc, 0x10, + 0xc7, 0x56, 0x44, 0x4f, 0x4f, 0x70, 0x9d, 0xb3, 0x9e, 0x9e, 0x44, 0xda, 0x57, 0x50, 0x39, 0xe3, + 0x56, 0x3c, 0x05, 0x85, 0x2b, 0x37, 0x99, 0xb6, 0x18, 0xbb, 0xb9, 0xf0, 0x24, 0x16, 0x63, 0x88, + 0xe2, 0x65, 0xa4, 0xf5, 0x60, 0xe3, 0x44, 0x5a, 0xcb, 0x19, 0xde, 0xdf, 0x1d, 0xed, 0x2f, 0x45, + 0xa8, 0xbd, 0xf0, 0x57, 0xac, 0xa0, 0x50, 0x0b, 0x8a, 0xce, 0x82, 0xcb, 0x95, 0x70, 0xd1, 0x59, + 0xa0, 0xdf, 0x40, 0xcb, 0x73, 0x2e, 0x43, 0x8b, 0x95, 0xa5, 0x40, 0x98, 0x68, 0x12, 0x3f, 0xce, + 0x5a, 0x36, 0x89, 0x39, 0x38, 0xcc, 0x36, 0xbc, 0xec, 0x36, 0x03, 0x9c, 0x52, 0x0e, 0x38, 0x1f, + 0x43, 0xcb, 0xf5, 0x6d, 0xcb, 0x35, 0x93, 0xb6, 0x5d, 0x16, 0xc5, 0xcd, 0xa9, 0xa7, 0x71, 0xef, + 0xbe, 0x13, 0x97, 0xca, 0x03, 0xe3, 0x82, 0xbe, 0x80, 0x66, 0x60, 0x85, 0xd4, 0xb1, 0x9d, 0xc0, + 0x62, 0x83, 0x4f, 0x95, 0x0b, 0xe6, 0xcc, 0xce, 0xc5, 0x0d, 0xe7, 0xd8, 0xd1, 0xa7, 0xa0, 0x46, + 0xbc, 0x25, 0x99, 0xb7, 0x7e, 0x78, 0x7d, 0xe1, 0xfa, 0xb7, 0x51, 0xbb, 0xc6, 0xed, 0xdf, 0x14, + 0xf4, 0xd7, 0x31, 0x59, 0xfb, 0x53, 0x09, 0xaa, 0x67, 0xa2, 0x3a, 0x0f, 0xa1, 0xcc, 0x63, 0x24, + 0x86, 0x9b, 0xbd, 0xec, 0x65, 0x82, 0x83, 0x07, 0x88, 0xf3, 0xa0, 0x9f, 0x40, 0x83, 0x3a, 0x1e, + 0x89, 0xa8, 0xe5, 0x05, 0x3c, 0xa8, 0x25, 0x9c, 0x12, 0xbe, 0xb3, 0xc4, 0x54, 0x28, 0xb1, 0xde, + 0x21, 0xc2, 0xc4, 0x96, 0xe8, 0xe7, 0xd0, 0x60, 0x98, 0xe2, 0x03, 0x57, 0xbb, 0xc2, 0x41, 0xba, + 0x73, 0x07, 0x51, 0xfc, 0x5a, 0x5c, 0x0f, 0x63, 0x94, 0xfe, 0x0a, 0x14, 0x8e, 0x02, 0x29, 0x24, + 0x9a, 0xd6, 0x5e, 0xbe, 0x69, 0xc5, 0x68, 0xc3, 0x90, 0xf6, 0x79, 0xf4, 0x04, 0x2a, 0x37, 0xdc, + 0xa4, 0x9a, 0x1c, 0xfc, 0xb2, 0xce, 0xf1, 0xf0, 0x8b, 0x73, 0xf6, 0x55, 0xfd, 0xad, 0xa8, 0x26, + 0xde, 0xae, 0xee, 0x7c, 0x55, 0x65, 0xa1, 0xe1, 0x98, 0x87, 0x7b, 0xe5, 0xb9, 0xbc, 0x63, 0x31, + 0xaf, 0x3c, 0x17, 0x7d, 0x04, 0x4d, 0x7b, 0x15, 0x86, 0x7c, 0xd4, 0x74, 0x3c, 0xd2, 0xde, 0xe1, + 0xc1, 0x51, 0x24, 0xcd, 0x70, 0x3c, 0x82, 0x7e, 0x0d, 0x2d, 0xd7, 0x8a, 0x28, 0x03, 0x9b, 0x74, + 0x64, 0x97, 0x5f, 0x95, 0x43, 0x9c, 0x00, 0x9b, 0xf0, 0x44, 0x71, 0xd3, 0x8d, 0x76, 0x05, 0xcd, + 0x89, 0xb3, 0x74, 0x3c, 0xcb, 0xe5, 0xa0, 0x64, 0xc1, 0xce, 0xb4, 0x13, 0xbe, 0x7e, 0x58, 0x27, + 0x41, 0x1f, 0x82, 0xc2, 0x4c, 0xb0, 0x7d, 0x77, 0xe5, 0x2d, 0x45, 0x85, 0x97, 0x70, 0x23, 0x38, + 0x19, 0x08, 0x02, 0x43, 0xa7, 0xbc, 0x69, 0x6e, 0x5f, 0x11, 0xcf, 0x42, 0x9f, 0x25, 0x68, 0x10, + 0x08, 0x6f, 0xe7, 0x71, 0x94, 0x1a, 0x15, 0xe3, 0x44, 0xfb, 0x7b, 0x11, 0x5a, 0x67, 0x62, 0xee, + 0x88, 0x67, 0x9d, 0xaf, 0x60, 0x9b, 0x5c, 0x5c, 0x10, 0x9b, 0x3a, 0x37, 0xc4, 0xb4, 0x2d, 0xd7, + 0x25, 0xa1, 0x29, 0x51, 0xab, 0x74, 0x37, 0x3b, 0xe2, 0xff, 0xc7, 0x80, 0xd3, 0x47, 0x43, 0xbc, + 0x95, 0xf0, 0x4a, 0xd2, 0x02, 0xe9, 0xb0, 0xed, 0x78, 0x1e, 0x59, 0x38, 0x16, 0xcd, 0x2a, 0x10, + 0x6d, 0x7e, 0x57, 0x7a, 0x7a, 0x66, 0x1c, 0x5b, 0x94, 0xa4, 0x6a, 0x12, 0x89, 0x44, 0xcd, 0xc7, + 0xcc, 0x99, 0xf0, 0x32, 0x19, 0x9f, 0x36, 0xa4, 0xa4, 0xc1, 0x89, 0x58, 0x1e, 0xe6, 0x46, 0xb3, + 0xf2, 0x9d, 0xd1, 0x2c, 0xfd, 0x7c, 0x56, 0xee, 0xfd, 0x7c, 0x7e, 0x09, 0x9b, 0xa2, 0xc5, 0xc6, + 0xa9, 0x8f, 0x51, 0xfd, 0xbd, 0x7d, 0xb6, 0x49, 0xd3, 0x4d, 0xa4, 0x7d, 0x01, 0x9b, 0x49, 0x20, + 0xe5, 0xe8, 0x76, 0x08, 0x55, 0x5e, 0x3e, 0x71, 0x3a, 0xd0, 0xdb, 0x90, 0xc5, 0x92, 0x43, 0xfb, + 0x5d, 0x11, 0x50, 0x2c, 0xef, 0xdf, 0x46, 0xff, 0xa3, 0xc9, 0xd8, 0x81, 0x0a, 0xa7, 0xcb, 0x4c, + 0x88, 0x0d, 0x8b, 0x03, 0x0b, 0x6a, 0x70, 0x9d, 0xa4, 0x41, 0x08, 0xbf, 0x64, 0xbf, 0x98, 0x44, + 0x2b, 0x97, 0x62, 0xc9, 0xa1, 0xfd, 0xb5, 0x00, 0xdb, 0xb9, 0x38, 0xc8, 0x58, 0xa6, 0x88, 0x29, + 0xbc, 0x03, 0x31, 0x07, 0x50, 0x0f, 0xae, 0xdf, 0x81, 0xac, 0xe4, 0xf4, 0x3b, 0x5b, 0xe0, 0x87, + 0x50, 0x0e, 0x59, 0x2b, 0x16, 0xdf, 0xd7, 0xec, 0x40, 0xc2, 0xe9, 0x6c, 0xaa, 0xc9, 0xf9, 0x91, + 0x9b, 0x6a, 0xa4, 0xfd, 0x0e, 0x28, 0x99, 0xce, 0xc0, 0x5a, 0x49, 0xbe, 0xaa, 0x64, 0xea, 0xbe, + 0xb7, 0xa8, 0x94, 0x4c, 0x51, 0xb1, 0x2e, 0x6e, 0xfb, 0x5e, 0xe0, 0x12, 0x4a, 0x44, 0xca, 0xea, + 0x38, 0x25, 0x68, 0x5f, 0x83, 0x92, 0x91, 0xbc, 0x6f, 0x78, 0x49, 0x93, 0x50, 0xba, 0x37, 0x09, + 0xff, 0x28, 0xc0, 0x6e, 0x5a, 0xcc, 0x2b, 0x97, 0xfe, 0x5f, 0xd5, 0xa3, 0x16, 0xc2, 0xde, 0x5d, + 0xef, 0xde, 0xab, 0xca, 0x7e, 0x40, 0xed, 0x1c, 0x7e, 0x09, 0x4a, 0x66, 0x06, 0x67, 0x7f, 0xd5, + 0x47, 0xc7, 0xd3, 0x19, 0xd6, 0xd5, 0x47, 0xa8, 0x0e, 0xe5, 0xb9, 0x31, 0x3b, 0x55, 0x0b, 0x6c, + 0xa5, 0x7f, 0xad, 0x0f, 0xc4, 0xdf, 0x7f, 0xb6, 0x32, 0x25, 0x53, 0xe9, 0xf0, 0x5f, 0x05, 0x80, + 0xf4, 0x2b, 0x8f, 0x14, 0xa8, 0xbd, 0x9a, 0x9e, 0x4c, 0x67, 0xaf, 0xa7, 0x42, 0xc1, 0xb1, 0x31, + 0x1a, 0xaa, 0x05, 0xd4, 0x80, 0x8a, 0x78, 0x4f, 0x28, 0xb2, 0x1b, 0xe4, 0x63, 0x42, 0x09, 0x35, + 0xa1, 0x9e, 0xbc, 0x24, 0x94, 0x51, 0x0d, 0x4a, 0xc9, 0x7b, 0x81, 0x7c, 0x20, 0xa8, 0x32, 0x85, + 0x58, 0x3f, 0x1d, 0xf7, 0x06, 0xba, 0x5a, 0x63, 0x07, 0xc9, 0x53, 0x01, 0x40, 0x35, 0x7e, 0x27, + 0x60, 0x92, 0x73, 0xdd, 0x50, 0x81, 0xdd, 0x33, 0x33, 0x9e, 0xeb, 0x58, 0x55, 0x18, 0x0d, 0xcf, + 0x5e, 0xab, 0x4d, 0x46, 0x7b, 0x36, 0xd2, 0xc7, 0x43, 0x75, 0x03, 0x6d, 0x40, 0xe3, 0xb9, 0xde, + 0xc3, 0x46, 0x5f, 0xef, 0x19, 0x6a, 0x8b, 0x9d, 0x9c, 0x71, 0x03, 0x37, 0xd9, 0x35, 0x2f, 0x66, + 0xaf, 0xf0, 0xb4, 0x37, 0x56, 0x55, 0xb6, 0x39, 0xd3, 0xf1, 0x7c, 0x34, 0x9b, 0xaa, 0x5b, 0xec, + 0x9e, 0x71, 0x6f, 0x6e, 0x9c, 0x9e, 0xa8, 0xe8, 0xf0, 0x09, 0xfb, 0xb0, 0x65, 0xc7, 0x3c, 0x80, + 0xaa, 0xd1, 0xeb, 0x8f, 0xf5, 0xb9, 0xfa, 0x88, 0xad, 0xe7, 0xcf, 0x7b, 0x78, 0x38, 0x57, 0x0b, + 0xfd, 0x4f, 0xbf, 0x79, 0x72, 0xe3, 0x50, 0x12, 0x45, 0x1d, 0xc7, 0x3f, 0x12, 0xab, 0xa3, 0x4b, + 0xff, 0xe8, 0x86, 0x1e, 0xf1, 0x37, 0xb0, 0xa3, 0x14, 0x64, 0xe7, 0x55, 0x4e, 0xf9, 0xc5, 0x7f, + 0x02, 0x00, 0x00, 0xff, 0xff, 0xb6, 0x0c, 0x0f, 0x53, 0x5f, 0x13, 0x00, 0x00, } diff --git a/go/vt/vtcombo/tablet_map.go b/go/vt/vtcombo/tablet_map.go index ed5c45425f..14072726f5 100644 --- a/go/vt/vtcombo/tablet_map.go +++ b/go/vt/vtcombo/tablet_map.go @@ -462,8 +462,8 @@ func (itc *internalTabletConn) StreamHealth(ctx context.Context, callback func(* } // VStream is part of queryservice.QueryService. -func (itc *internalTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - err := itc.tablet.qsc.QueryService().VStream(ctx, target, startPos, filter, send) +func (itc *internalTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { + err := itc.tablet.qsc.QueryService().VStream(ctx, target, startPos, tableLastPKs, filter, send) return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err)) } diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index db8652f1e4..2b8f765bc2 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -23,6 +23,8 @@ import ( "os" "testing" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/vttest" @@ -207,9 +209,13 @@ func TestMain(m *testing.M) { defer os.RemoveAll(cfg.SchemaDir) cfg.TabletHostName = *tabletHostName - + env, err := vttest.NewLocalTestEnvWithDirectory("", 9000, "/tmp/vttest") + if err != nil { + log.Errorf("err is %v", err) + } cluster = &vttest.LocalCluster{ Config: cfg, + Env: env, } if err := cluster.Setup(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) @@ -217,7 +223,6 @@ func TestMain(m *testing.M) { return 1 } defer cluster.TearDown() - vtParams = mysql.ConnParams{ Host: "localhost", Port: cluster.Env.PortForProtocol("vtcombo_mysql_port", ""), diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index f38e0c47e0..991e2c4c03 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -19,10 +19,14 @@ package endtoend import ( "context" "fmt" + "io" "testing" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/query" querypb "vitess.io/vitess/go/vt/proto/query" @@ -30,25 +34,32 @@ import ( "vitess.io/vitess/go/vt/vtgate/vtgateconn" ) -func TestVStream(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - +func initialize(ctx context.Context, t *testing.T) (*vtgateconn.VTGateConn, *mysql.Conn, *mysql.Conn, func()) { gconn, err := vtgateconn.Dial(ctx, grpcAddress) if err != nil { t.Fatal(err) } - defer gconn.Close() conn, err := mysql.Connect(ctx, &vtParams) if err != nil { t.Fatal(err) } - defer conn.Close() mconn, err := mysql.Connect(ctx, &mysqlParams) if err != nil { t.Fatal(err) } - defer conn.Close() + close := func() { + gconn.Close() + conn.Close() + mconn.Close() + } + return gconn, conn, mconn, close +} +func TestVStream(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() mpos, err := mconn.MasterPosition() if err != nil { @@ -128,3 +139,88 @@ func TestVStream(t *testing.T) { } cancel() } + +func TestVStreamCopyNoPos(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "ks", + Shard: "-80", + Gtid: "", + }}, + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*/", + }}, + } + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + _, _ = conn, mconn + if err != nil { + t.Fatal(err) + } + require.NotNil(t, reader) + events, err := reader.Recv() + require.Errorf(t, err, "Stream needs a position or a table to copy") + require.Nil(t, events) + + cancel() +} + +func TestVStreamCopyBasic(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + t.Fatal(err) + } + + lastPK := sqltypes.Result{ + Fields: []*query.Field{{Name: "id1", Type: query.Type_INT32}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}}, + } + qr := sqltypes.ResultToProto3(&lastPK) + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "ks", + Shard: "-80", + Gtid: "", + TablePKs: []*binlogdatapb.TableLastPK{{ + TableName: "t1", + Lastpk: qr, + }, + }, + }}, + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter) + _, _ = conn, mconn + if err != nil { + t.Fatal(err) + } + require.NotNil(t, reader) + for { + e, err := reader.Recv() + switch err { + case nil: + log.Infof("Received Events\n%v\n", e) + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + t.Fatalf("remote error: %v\n", err) + } + } +} diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 81ee3765e8..ebe0860177 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -1,8 +1,7 @@ /* Copyright 2019 The Vitess Authors. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. + You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 @@ -150,6 +149,8 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat newvgtid.ShardGtids = append(newvgtid.ShardGtids, sgtid) } } + //TODO add tablepk validations + return newvgtid, filter, nil } @@ -212,7 +213,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss) } // Safe to access sgtid.Gtid here (because it can't change until streaming begins). - err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, vs.filter, func(events []*binlogdatapb.VEvent) error { + err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -250,6 +251,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } eventss = nil sendevents = nil + case binlogdatapb.VEventType_LASTPK: + // don't send lastpk, sent as part of vgtid + if len(eventss) > 0 { + if err := vs.sendAll(sgtid, eventss); err != nil { + return err + } + eventss = nil + sendevents = nil + } + case binlogdatapb.VEventType_HEARTBEAT: // Remove all heartbeat events for now. // Otherwise they can accumulate indefinitely if there are no real events. diff --git a/go/vt/vttablet/grpcqueryservice/server.go b/go/vt/vttablet/grpcqueryservice/server.go index e7979e1497..65f903a6ea 100644 --- a/go/vt/vttablet/grpcqueryservice/server.go +++ b/go/vt/vttablet/grpcqueryservice/server.go @@ -341,7 +341,7 @@ func (q *query) VStream(request *binlogdatapb.VStreamRequest, stream queryservic request.EffectiveCallerId, request.ImmediateCallerId, ) - err = q.server.VStream(ctx, request.Target, request.Position, request.Filter, func(events []*binlogdatapb.VEvent) error { + err = q.server.VStream(ctx, request.Target, request.Position, request.TableLastPKs, request.Filter, func(events []*binlogdatapb.VEvent) error { return stream.Send(&binlogdatapb.VStreamResponse{ Events: events, }) diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index 3c3a62cb32..8c87a49f58 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -598,7 +598,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu } // VStream starts a VReplication stream. -func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target, position string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { stream, err := func() (queryservicepb.Query_VStreamClient, error) { conn.mu.RLock() defer conn.mu.RUnlock() @@ -612,6 +612,7 @@ func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx), Position: position, Filter: filter, + TableLastPKs: tablePKs, } stream, err := conn.c.VStream(ctx, req) if err != nil { diff --git a/go/vt/vttablet/queryservice/queryservice.go b/go/vt/vttablet/queryservice/queryservice.go index 2f6a9b68ba..2dcf5ebab8 100644 --- a/go/vt/vttablet/queryservice/queryservice.go +++ b/go/vt/vttablet/queryservice/queryservice.go @@ -97,7 +97,7 @@ type QueryService interface { MessageAck(ctx context.Context, target *querypb.Target, name string, ids []*querypb.Value) (count int64, err error) // VStream streams VReplication events based on the specified filter. - VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error // VStreamRows streams rows of a table from the specified starting point. VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error diff --git a/go/vt/vttablet/queryservice/wrapped.go b/go/vt/vttablet/queryservice/wrapped.go index 2ac41122d4..c0af4a273c 100644 --- a/go/vt/vttablet/queryservice/wrapped.go +++ b/go/vt/vttablet/queryservice/wrapped.go @@ -236,9 +236,9 @@ func (ws *wrappedService) MessageAck(ctx context.Context, target *querypb.Target return count, err } -func (ws *wrappedService) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (ws *wrappedService) VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { return ws.wrapper(ctx, target, ws.impl, "VStream", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { - innerErr := conn.VStream(ctx, target, startPos, filter, send) + innerErr := conn.VStream(ctx, target, startPos, tableLastPKs, filter, send) return false, innerErr }) } diff --git a/go/vt/vttablet/sandboxconn/sandboxconn.go b/go/vt/vttablet/sandboxconn/sandboxconn.go index a37d50ae55..a4fd397028 100644 --- a/go/vt/vttablet/sandboxconn/sandboxconn.go +++ b/go/vt/vttablet/sandboxconn/sandboxconn.go @@ -344,7 +344,7 @@ func (sbc *SandboxConn) AddVStreamEvents(events []*binlogdatapb.VEvent, err erro } // VStream is part of the QueryService interface. -func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { if sbc.StartPos != "" && sbc.StartPos != startPos { return fmt.Errorf("startPos(%v): %v, want %v", target, startPos, sbc.StartPos) } diff --git a/go/vt/vttablet/tabletconntest/fakequeryservice.go b/go/vt/vttablet/tabletconntest/fakequeryservice.go index f49776f616..2d5993f8f7 100644 --- a/go/vt/vttablet/tabletconntest/fakequeryservice.go +++ b/go/vt/vttablet/tabletconntest/fakequeryservice.go @@ -666,14 +666,16 @@ var TestStreamHealthStreamHealthResponse = &querypb.StreamHealthResponse{ Shard: "test_shard", TabletType: topodatapb.TabletType_RDONLY, }, - Serving: true, + Serving: true, + TabletExternallyReparentedTimestamp: 1234589, + RealtimeStats: &querypb.RealtimeStats{ + CpuUsage: 1.0, HealthError: "random error", SecondsBehindMaster: 234, BinlogPlayersCount: 1, SecondsBehindMasterFilteredReplication: 2, - CpuUsage: 1.0, }, } @@ -697,7 +699,7 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer } // VStream is part of the queryservice.QueryService interface -func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { panic("not implemented") } diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index 1220a74ca5..d991cb4a31 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -46,7 +46,7 @@ type VStreamerClient interface { Close(context.Context) error // VStream streams VReplication events based on the specified filter. - VStream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error // VStreamRows streams rows of a table from the specified starting point. VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error @@ -124,7 +124,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error { return nil } -func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send) } @@ -169,8 +169,8 @@ func (tc *tabletConnector) Close(ctx context.Context) error { return tc.qs.Close(ctx) } -func (tc *tabletConnector) VStream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return tc.qs.VStream(ctx, tc.target, startPos, filter, send) +func (tc *tabletConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { + return tc.qs.VStream(ctx, tc.target, startPos, tablePKs, filter, send) } func (tc *tabletConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index d29e40676b..aa98039c31 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -224,7 +224,7 @@ func (ftc *fakeTabletConn) StreamHealth(ctx context.Context, callback func(*quer var vstreamHook func(ctx context.Context) // VStream directly calls into the pre-initialized engine. -func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { if target.Keyspace != "vttest" { <-ctx.Done() return io.EOF @@ -232,7 +232,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, if vstreamHook != nil { vstreamHook(ctx) } - return streamerEngine.Stream(ctx, startPos, nil, filter, send) + return streamerEngine.Stream(ctx, startPos, tablePKs, filter, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index f9cf771168..3a84acea71 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -19,8 +19,6 @@ package messager import ( "sync" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" - "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" @@ -46,7 +44,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index ec3763d252..778ecb80ec 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -26,8 +26,6 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" - "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/test/utils" @@ -893,7 +891,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go index 86f5f7c9e5..178bdeab8d 100644 --- a/go/vt/vttablet/tabletserver/replication_watcher.go +++ b/go/vt/vttablet/tabletserver/replication_watcher.go @@ -19,8 +19,6 @@ package tabletserver import ( "time" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" - "golang.org/x/net/context" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -33,7 +31,7 @@ import ( // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error } // ReplicationWatcher is a tabletserver service that watches the diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index f4311fa8fe..34246108c8 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1353,11 +1353,11 @@ func (tsv *TabletServer) execDML(ctx context.Context, target *querypb.Target, qu } // VStream streams VReplication events. -func (tsv *TabletServer) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (tsv *TabletServer) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { if err := tsv.verifyTarget(ctx, target); err != nil { return err } - return tsv.vstreamer.Stream(ctx, startPos, nil, filter, send) + return tsv.vstreamer.Stream(ctx, startPos, tablePKs, filter, send) } // VStreamRows streams rows from the specified starting point. diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 21f2be8b20..c11e22cb8b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -116,13 +116,34 @@ func (uvs *uvstreamer) sendEventsForRows(ctx context.Context, tableName string, return nil } +func getLastPKFromQR(qr *querypb.QueryResult) []sqltypes.Value { + var lastPK []sqltypes.Value + r := sqltypes.Proto3ToResult(qr) + if len(r.Rows) != 1 { + log.Errorf("unexpected lastpk input: %v", qr) + return nil + } + lastPK = r.Rows[0] + return lastPK +} + +func getQRFromLastPK(fields []*querypb.Field, lastPK []sqltypes.Value) *querypb.QueryResult { + row := sqltypes.RowToProto3(lastPK) + qr := &querypb.QueryResult{ + Fields: fields, + Rows: []*querypb.Row{row}, + } + return qr +} + func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { ctx, cancel := context.WithCancel(ctx) defer cancel() var err error var newLastPK *sqltypes.Result - lastPK := uvs.plans[tableName].tablePK.lastPK.Rows[0] + lastPK := getLastPKFromQR(uvs.plans[tableName].tablePK.Lastpk) filter := uvs.plans[tableName].rule.Filter + log.Infof("Starting copyTable for %s, PK %v", tableName, lastPK) uvs.sendTestEvent(fmt.Sprintf("Copy Start %s", tableName)) @@ -141,9 +162,9 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { pos, _ := mysql.DecodePosition(rows.Gtid) if !uvs.pos.IsZero() && !uvs.pos.AtLeast(pos) { uvs.fastForward(ctx, rows.Gtid) - } - if mysql.EncodePosition(uvs.pos) != rows.Gtid { - log.Errorf("Position after fastforward was %s but stopPos was %s", uvs.pos, rows.Gtid) + if mysql.EncodePosition(uvs.pos) != rows.Gtid { + log.Errorf("Position after fastforward was %s but stopPos was %s", uvs.pos, rows.Gtid) + } } fieldEvent := &binlogdatapb.FieldEvent{ @@ -155,9 +176,10 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error { uvs.sendFieldEvent(ctx, rows.Gtid, fieldEvent) } if len(rows.Rows) == 0 { - log.Infof("0 rows returned for table %s", tableName) + //log.Infof("0 rows returned for table %s", tableName) return nil } + uvs.sendEventsForRows(ctx, tableName, rows) newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 70396e705e..c49e5d6e5a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -38,12 +38,6 @@ import ( vschemapb "vitess.io/vitess/go/vt/proto/vschema" ) -// TableLastPK records the lastPK seen during the copy phase for a table -type TableLastPK struct { - name string - lastPK *sqltypes.Result -} - // Engine is the engine for handling vreplication streaming requests. type Engine struct { env tabletenv.Env @@ -151,7 +145,7 @@ func (vse *Engine) vschema() *vindexes.VSchema { } // Stream starts a new stream. -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { // Ensure vschema is initialized and the watcher is started. // Starting of the watcher has to be delayed till the first call to Stream // because this overhead should be incurred only if someone uses this feature. diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 67b6df54c0..618ceaf382 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -21,7 +21,7 @@ import ( var uvstreamerTestMode = false // Only used for testing type tablePlan struct { - tablePK *TableLastPK + tablePK *binlogdatapb.TableLastPK rule *binlogdatapb.Rule } @@ -69,7 +69,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, sh schema.Historian, startPos string, tablePKs []*TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, sh schema.Historian, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, @@ -98,8 +98,8 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se uvs.plans[rule.Match] = plan //TODO: only handles actual table name now, no regular expressions } for _, tablePK := range tablePKs { - uvs.plans[tablePK.name].tablePK = tablePK - uvs.tablesToCopy = append(uvs.tablesToCopy, tablePK.name) + uvs.plans[tablePK.TableName].tablePK = tablePK + uvs.tablesToCopy = append(uvs.tablesToCopy, tablePK.TableName) } sort.Strings(uvs.tablesToCopy) } @@ -136,16 +136,16 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. if !shouldSend && tableName != "" { shouldSend = true _, ok := uvs.plans[tableName] - if ok && uvs.plans[tableName].tablePK.lastPK.Rows[0] == nil { + if ok && uvs.plans[tableName].tablePK == nil { shouldSend = false } } if shouldSend { evs2 = append(evs2, ev) //log.Infof("shouldSend: sending %v table %s", ev.String(), tableName) - } else { - //log.Infof("shouldSend: filtering out %v", ev.String()) } + //log.Infof("shouldSend: filtering out %v", ev.String()) + } return evs2 } @@ -231,7 +231,7 @@ func (uvs *uvstreamer) init() error { return err } //startpos validation for tablepk != nil if uvs.pos.IsZero() && (len(uvs.plans) == 0) { - return fmt.Errorf("Stream needs atleast a position or a table to copy") + return fmt.Errorf("Stream needs a position or a table to copy") } return nil } @@ -271,8 +271,8 @@ func (uvs *uvstreamer) SetVSchema(vschema *localVSchema) { } func (uvs *uvstreamer) setCopyState(tableName string, lastPK *sqltypes.Result) { - uvs.plans[tableName].tablePK.lastPK = lastPK qr := sqltypes.ResultToProto3(lastPK) + uvs.plans[tableName].tablePK.Lastpk = qr lastPKEvent := &binlogdatapb.LastPKEvent{ TableLastPK: &binlogdatapb.TableLastPK{ TableName: tableName, diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go index e96e26ebfa..13db3e8d87 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go @@ -8,12 +8,12 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/binlogdata" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/proto/query" + + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/log" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) const ( @@ -66,8 +66,8 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { initialize(t) engine.se.Reload(context.Background()) - var rules []*binlogdata.Rule - var tablePKs []*TableLastPK + var rules []*binlogdatapb.Rule + var tablePKs []*binlogdatapb.TableLastPK for i, table := range testState.tables { rules = append(rules, getRule(table)) tablePKs = append(tablePKs, getTablePK(table, i+1)) @@ -200,20 +200,20 @@ func initialize(t *testing.T) { } } -func getRule(table string) *binlogdata.Rule { - return &binlogdata.Rule{ +func getRule(table string) *binlogdatapb.Rule { + return &binlogdatapb.Rule{ Match: table, Filter: fmt.Sprintf("select * from %s", table), } } -func getTablePK(table string, idx int) *TableLastPK { - return &TableLastPK{ - name: table, - lastPK: &sqltypes.Result{ - Fields: []*query.Field{{Name: fmt.Sprintf("id%d1", idx), Type: query.Type_INT32}}, - Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}}, - }, +func getTablePK(table string, idx int) *binlogdatapb.TableLastPK { + fields := []*query.Field{{Name: fmt.Sprintf("id%d1", idx), Type: query.Type_INT32}} + + lastPK := []sqltypes.Value{sqltypes.NewInt32(0)} + return &binlogdatapb.TableLastPK{ + TableName: table, + Lastpk: getQRFromLastPK(fields, lastPK), } } @@ -242,7 +242,7 @@ func getEventCallback(ctx context.Context, t *testing.T, event *binlogdatapb.VEv return nil } -func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*TableLastPK) { +func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) { pos := "" go func() { err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index cc5f04973b..fb3c913041 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -23,12 +23,10 @@ import ( "strings" "testing" "time" - - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/log" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -137,22 +135,9 @@ func TestVStreamCopySimpleFlow(t *testing.T) { }}, } - var tablePKs []*TableLastPK - tablePKs = append(tablePKs, &TableLastPK{ - name: "t1", - lastPK: &sqltypes.Result{ - Fields: []*query.Field{{Name: "id11", Type: query.Type_INT32}}, - Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}}, - }, - }) - - tablePKs = append(tablePKs, &TableLastPK{ - name: "t2", - lastPK: &sqltypes.Result{ - Fields: []*query.Field{{Name: "id21", Type: query.Type_INT32}}, - Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}}, - }, - }) + var tablePKs []*binlogdatapb.TableLastPK + tablePKs = append(tablePKs, getTablePK("t1", 1)) + tablePKs = append(tablePKs, getTablePK("t2", 2)) t1FieldEvent := []string{"type:FIELD field_event: fields: > "} t2FieldEvent := []string{"type:FIELD field_event: fields: > "} @@ -1455,7 +1440,7 @@ func TestFilteredMultipleWhere(t *testing.T) { runCases(t, filter, testcases, "", nil) } -func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*TableLastPK) { +func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) { t.Helper() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1548,7 +1533,7 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ var lastPos string -func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*TableLastPK) <-chan []*binlogdatapb.VEvent { +func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) <-chan []*binlogdatapb.VEvent { if position == "" && len(tablePKs) == 0 { position = masterPosition(t) } @@ -1564,7 +1549,7 @@ func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, return ch } -func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error { +func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error { if filter == nil { filter = &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 36f7754732..2788810879 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -365,6 +365,7 @@ message VStreamRequest { string position = 4; Filter filter = 5; + repeated TableLastPK table_last_p_ks = 6; } // VStreamResponse is the response from VStreamer