VSCopy: vtgate-related changes

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Rohit Nayak 2020-05-27 22:59:20 +02:00
Родитель 2f8983a628
Коммит a24770f208
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: BA0A4E9168156524
23 изменённых файлов: 339 добавлений и 219 удалений

Просмотреть файл

@ -1499,6 +1499,7 @@ type VStreamRequest struct {
Target *query.Target `protobuf:"bytes,3,opt,name=target,proto3" json:"target,omitempty"`
Position string `protobuf:"bytes,4,opt,name=position,proto3" json:"position,omitempty"`
Filter *Filter `protobuf:"bytes,5,opt,name=filter,proto3" json:"filter,omitempty"`
TableLastPKs []*TableLastPK `protobuf:"bytes,6,rep,name=table_last_p_ks,json=tableLastPKs,proto3" json:"table_last_p_ks,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -1564,6 +1565,13 @@ func (m *VStreamRequest) GetFilter() *Filter {
return nil
}
func (m *VStreamRequest) GetTableLastPKs() []*TableLastPK {
if m != nil {
return m.TableLastPKs
}
return nil
}
// VStreamResponse is the response from VStreamer
type VStreamResponse struct {
Events []*VEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"`
@ -2004,123 +2012,124 @@ func init() {
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_5fd02bcb2e350dad) }
var fileDescriptor_5fd02bcb2e350dad = []byte{
// 1888 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x58, 0x4b, 0x73, 0xdb, 0xc8,
0x11, 0x36, 0x09, 0x3e, 0x1b, 0x12, 0x05, 0x8d, 0x1e, 0x61, 0x5c, 0xd9, 0x2d, 0x2d, 0x2a, 0x5e,
0x6b, 0x55, 0x15, 0x6a, 0xc3, 0x64, 0x9d, 0x4b, 0x36, 0x1b, 0x3e, 0x60, 0x99, 0x16, 0x48, 0xca,
0x43, 0x58, 0xde, 0xda, 0x0b, 0x0a, 0x06, 0x47, 0x12, 0x22, 0x80, 0x80, 0x81, 0xa1, 0xb4, 0xfc,
0x01, 0xa9, 0xe4, 0x9e, 0x5f, 0x91, 0x53, 0x0e, 0xb9, 0x26, 0xd7, 0xfc, 0x8a, 0x5c, 0x73, 0x49,
0xfe, 0x44, 0x6a, 0x1e, 0x00, 0x01, 0x79, 0x63, 0xcb, 0x5b, 0x95, 0x43, 0x72, 0x61, 0xcd, 0xf4,
0x74, 0xf7, 0xf4, 0xeb, 0xeb, 0x69, 0x10, 0xb4, 0xd7, 0xde, 0xc2, 0x0f, 0x2f, 0xe7, 0x0e, 0x75,
0x3a, 0x51, 0x1c, 0xd2, 0x10, 0xc1, 0x9a, 0xf2, 0x50, 0xbd, 0xa1, 0x71, 0xe4, 0x8a, 0x83, 0x87,
0xea, 0x9b, 0x25, 0x89, 0x57, 0x72, 0xd3, 0xa2, 0x61, 0x14, 0xae, 0xa5, 0xf4, 0x31, 0xd4, 0x07,
0x57, 0x4e, 0x9c, 0x10, 0x8a, 0xf6, 0xa1, 0xe6, 0xfa, 0x1e, 0x59, 0xd0, 0x76, 0xe9, 0xa0, 0x74,
0x58, 0xc5, 0x72, 0x87, 0x10, 0x54, 0xdc, 0x70, 0xb1, 0x68, 0x97, 0x39, 0x95, 0xaf, 0x19, 0x6f,
0x42, 0xe2, 0x1b, 0x12, 0xb7, 0x15, 0xc1, 0x2b, 0x76, 0xfa, 0x3f, 0x14, 0xd8, 0xee, 0x73, 0x3b,
0xac, 0xd8, 0x59, 0x24, 0x8e, 0x4b, 0xbd, 0x70, 0x81, 0x4e, 0x00, 0x12, 0xea, 0x50, 0x12, 0x90,
0x05, 0x4d, 0xda, 0xa5, 0x03, 0xe5, 0x50, 0xed, 0x3e, 0xee, 0xe4, 0x3c, 0x78, 0x4b, 0xa4, 0x33,
0x4b, 0xf9, 0x71, 0x4e, 0x14, 0x75, 0x41, 0x25, 0x37, 0x64, 0x41, 0x6d, 0x1a, 0x5e, 0x93, 0x45,
0xbb, 0x72, 0x50, 0x3a, 0x54, 0xbb, 0xdb, 0x1d, 0xe1, 0xa0, 0xc1, 0x4e, 0x2c, 0x76, 0x80, 0x81,
0x64, 0xeb, 0x87, 0x7f, 0x2b, 0x43, 0x33, 0xd3, 0x86, 0x4c, 0x68, 0xb8, 0x0e, 0x25, 0x97, 0x61,
0xbc, 0xe2, 0x6e, 0xb6, 0xba, 0x9f, 0xdf, 0xd3, 0x90, 0xce, 0x40, 0xca, 0xe1, 0x4c, 0x03, 0xfa,
0x09, 0xd4, 0x5d, 0x11, 0x3d, 0x1e, 0x1d, 0xb5, 0xbb, 0x93, 0x57, 0x26, 0x03, 0x8b, 0x53, 0x1e,
0xa4, 0x81, 0x92, 0xbc, 0xf1, 0x79, 0xc8, 0x36, 0x30, 0x5b, 0xea, 0x7f, 0x2c, 0x41, 0x23, 0xd5,
0x8b, 0x76, 0x60, 0xab, 0x6f, 0xda, 0x2f, 0x27, 0xd8, 0x18, 0x4c, 0x4f, 0x26, 0xa3, 0x6f, 0x8c,
0xa1, 0xf6, 0x00, 0x6d, 0x40, 0xa3, 0x6f, 0xda, 0x7d, 0xe3, 0x64, 0x34, 0xd1, 0x4a, 0x68, 0x13,
0x9a, 0x7d, 0xd3, 0x1e, 0x4c, 0xc7, 0xe3, 0x91, 0xa5, 0x95, 0xd1, 0x16, 0xa8, 0x7d, 0xd3, 0xc6,
0x53, 0xd3, 0xec, 0xf7, 0x06, 0xa7, 0x9a, 0x82, 0xf6, 0x60, 0xbb, 0x6f, 0xda, 0xc3, 0xb1, 0x69,
0x0f, 0x8d, 0x33, 0x6c, 0x0c, 0x7a, 0x96, 0x31, 0xd4, 0x2a, 0x08, 0xa0, 0xc6, 0xc8, 0x43, 0x53,
0xab, 0xca, 0xf5, 0xcc, 0xb0, 0xb4, 0x9a, 0x54, 0x37, 0x9a, 0xcc, 0x0c, 0x6c, 0x69, 0x75, 0xb9,
0x7d, 0x79, 0x36, 0xec, 0x59, 0x86, 0xd6, 0x90, 0xdb, 0xa1, 0x61, 0x1a, 0x96, 0xa1, 0x35, 0x9f,
0x57, 0x1a, 0x65, 0x4d, 0x79, 0x5e, 0x69, 0x28, 0x5a, 0x45, 0xff, 0x43, 0x09, 0xf6, 0x66, 0x34,
0x26, 0x4e, 0x70, 0x4a, 0x56, 0xd8, 0x59, 0x5c, 0x12, 0x4c, 0xde, 0x2c, 0x49, 0x42, 0xd1, 0x43,
0x68, 0x44, 0x61, 0xe2, 0xb1, 0xd8, 0xf1, 0x00, 0x37, 0x71, 0xb6, 0x47, 0xc7, 0xd0, 0xbc, 0x26,
0x2b, 0x3b, 0x66, 0xfc, 0x32, 0x60, 0xa8, 0x93, 0x15, 0x64, 0xa6, 0xa9, 0x71, 0x2d, 0x57, 0xf9,
0xf8, 0x2a, 0xef, 0x8f, 0xaf, 0x7e, 0x01, 0xfb, 0x77, 0x8d, 0x4a, 0xa2, 0x70, 0x91, 0x10, 0x64,
0x02, 0x12, 0x82, 0x36, 0x5d, 0xe7, 0x96, 0xdb, 0xa7, 0x76, 0x3f, 0x7a, 0x67, 0x01, 0xe0, 0xed,
0xd7, 0x77, 0x49, 0xfa, 0xb7, 0xb0, 0x23, 0xee, 0xb1, 0x9c, 0xd7, 0x3e, 0x49, 0xee, 0xe3, 0xfa,
0x3e, 0xd4, 0x28, 0x67, 0x6e, 0x97, 0x0f, 0x94, 0xc3, 0x26, 0x96, 0xbb, 0x0f, 0xf5, 0x70, 0x0e,
0xbb, 0xc5, 0x9b, 0xff, 0x2b, 0xfe, 0xfd, 0x1c, 0x2a, 0x78, 0xe9, 0x13, 0xb4, 0x0b, 0xd5, 0xc0,
0xa1, 0xee, 0x95, 0xf4, 0x46, 0x6c, 0x98, 0x2b, 0x17, 0x9e, 0x4f, 0x49, 0xcc, 0x53, 0xd8, 0xc4,
0x72, 0xa7, 0xff, 0xb9, 0x04, 0xb5, 0xa7, 0x7c, 0x89, 0x3e, 0x85, 0x6a, 0xbc, 0x64, 0xce, 0x0a,
0xac, 0x6b, 0x79, 0x0b, 0x98, 0x66, 0x2c, 0x8e, 0xd1, 0x08, 0x5a, 0x17, 0x1e, 0xf1, 0xe7, 0x1c,
0xba, 0xe3, 0x70, 0x2e, 0xaa, 0xa2, 0xd5, 0xfd, 0x24, 0x2f, 0x20, 0x74, 0x76, 0x9e, 0x16, 0x18,
0xf1, 0x1d, 0x41, 0xfd, 0x09, 0xb4, 0x8a, 0x1c, 0x0c, 0x4e, 0x06, 0xc6, 0xf6, 0x74, 0x62, 0x8f,
0x47, 0xb3, 0x71, 0xcf, 0x1a, 0x3c, 0xd3, 0x1e, 0x70, 0xc4, 0x18, 0x33, 0xcb, 0x36, 0x9e, 0x3e,
0x9d, 0x62, 0x4b, 0x2b, 0xe9, 0xff, 0x2a, 0xc3, 0x86, 0x08, 0xca, 0x2c, 0x5c, 0xc6, 0x2e, 0x61,
0x59, 0xbc, 0x26, 0xab, 0x24, 0x72, 0x5c, 0x92, 0x66, 0x31, 0xdd, 0xb3, 0x80, 0x24, 0x57, 0x4e,
0x3c, 0x97, 0x9e, 0x8b, 0x0d, 0xfa, 0x02, 0x54, 0x9e, 0x4d, 0x6a, 0xd3, 0x55, 0x44, 0x78, 0x1e,
0x5b, 0xdd, 0xdd, 0x75, 0x61, 0xf3, 0x5c, 0x51, 0x6b, 0x15, 0x11, 0x0c, 0x34, 0x5b, 0x17, 0xd1,
0x50, 0xb9, 0x07, 0x1a, 0xd6, 0x35, 0x54, 0x2d, 0xd4, 0xd0, 0x51, 0x96, 0x90, 0x9a, 0xd4, 0xf2,
0x56, 0xf4, 0xd2, 0x24, 0xa1, 0x0e, 0xd4, 0xc2, 0x85, 0x3d, 0x9f, 0xfb, 0xed, 0x3a, 0x37, 0xf3,
0x07, 0x79, 0xde, 0xe9, 0x62, 0x38, 0x34, 0x7b, 0xa2, 0x2c, 0xaa, 0xe1, 0x62, 0x38, 0xf7, 0xd1,
0x23, 0x68, 0x91, 0x6f, 0x29, 0x89, 0x17, 0x8e, 0x6f, 0x07, 0x2b, 0xd6, 0xbd, 0x1a, 0xdc, 0xf5,
0xcd, 0x94, 0x3a, 0x66, 0x44, 0xf4, 0x29, 0x6c, 0x25, 0x34, 0x8c, 0x6c, 0xe7, 0x82, 0x92, 0xd8,
0x76, 0xc3, 0x68, 0xd5, 0x6e, 0x1e, 0x94, 0x0e, 0x1b, 0x78, 0x93, 0x91, 0x7b, 0x8c, 0x3a, 0x08,
0xa3, 0x95, 0xfe, 0x02, 0x9a, 0x38, 0xbc, 0x1d, 0x5c, 0x71, 0x7f, 0x74, 0xa8, 0xbd, 0x26, 0x17,
0x61, 0x4c, 0x64, 0xa1, 0x82, 0x6c, 0xe4, 0x38, 0xbc, 0xc5, 0xf2, 0x04, 0x1d, 0x40, 0x95, 0xeb,
0x94, 0xed, 0x22, 0xcf, 0x22, 0x0e, 0x74, 0x07, 0x1a, 0x38, 0xbc, 0xe5, 0x69, 0x47, 0x1f, 0x81,
0x08, 0xb0, 0xbd, 0x70, 0x82, 0x34, 0x7b, 0x4d, 0x4e, 0x99, 0x38, 0x01, 0x41, 0x4f, 0x40, 0x8d,
0xc3, 0x5b, 0xdb, 0xe5, 0xd7, 0x0b, 0x24, 0xaa, 0xdd, 0xbd, 0x42, 0x71, 0xa6, 0xc6, 0x61, 0x88,
0xd3, 0x65, 0xa2, 0xbf, 0x00, 0x58, 0xd7, 0xd6, 0xfb, 0x2e, 0xf9, 0x31, 0xcb, 0x06, 0xf1, 0xe7,
0xa9, 0xfe, 0x0d, 0x69, 0x32, 0xd7, 0x80, 0xe5, 0x99, 0xfe, 0xfb, 0x12, 0x34, 0x67, 0xac, 0x7a,
0x4e, 0xa8, 0x37, 0xff, 0x1e, 0x35, 0x87, 0xa0, 0x72, 0x49, 0xbd, 0x39, 0x2f, 0xb6, 0x26, 0xe6,
0x6b, 0xf4, 0x45, 0x6a, 0x58, 0x64, 0x5f, 0x27, 0xed, 0x0a, 0xbf, 0xbd, 0x90, 0x5f, 0x5e, 0x88,
0xa6, 0x93, 0xd0, 0xb3, 0x53, 0xdc, 0xe0, 0xac, 0x67, 0xa7, 0x89, 0xfe, 0x15, 0x54, 0xcf, 0xb9,
0x15, 0x4f, 0x40, 0xe5, 0xca, 0x6d, 0xa6, 0x2d, 0xc5, 0x6e, 0x21, 0x3c, 0x99, 0xc5, 0x18, 0x92,
0x74, 0x99, 0xe8, 0x3d, 0xd8, 0x3c, 0x95, 0xd6, 0x72, 0x86, 0x0f, 0x77, 0x47, 0xff, 0x4b, 0x19,
0xea, 0xcf, 0xc3, 0x25, 0x2b, 0x28, 0xd4, 0x82, 0xb2, 0x37, 0xe7, 0x72, 0x0a, 0x2e, 0x7b, 0x73,
0xf4, 0x6b, 0x68, 0x05, 0xde, 0x65, 0xec, 0xb0, 0xb2, 0x14, 0x08, 0x13, 0x4d, 0xe2, 0x87, 0x79,
0xcb, 0xc6, 0x29, 0x07, 0x87, 0xd9, 0x66, 0x90, 0xdf, 0xe6, 0x80, 0xa3, 0x14, 0x80, 0xf3, 0x08,
0x5a, 0x7e, 0xe8, 0x3a, 0xbe, 0x9d, 0xb5, 0xed, 0x8a, 0x28, 0x6e, 0x4e, 0x3d, 0x4b, 0x7b, 0xf7,
0x9d, 0xb8, 0x54, 0xef, 0x19, 0x17, 0xf4, 0x25, 0x6c, 0x44, 0x4e, 0x4c, 0x3d, 0xd7, 0x8b, 0x1c,
0x36, 0xf8, 0xd4, 0xb8, 0x60, 0xc1, 0xec, 0x42, 0xdc, 0x70, 0x81, 0x1d, 0x7d, 0x06, 0x5a, 0xc2,
0x5b, 0x92, 0x7d, 0x1b, 0xc6, 0xd7, 0x17, 0x7e, 0x78, 0x9b, 0xb4, 0xeb, 0xdc, 0xfe, 0x2d, 0x41,
0x7f, 0x95, 0x92, 0xf5, 0x3f, 0x29, 0x50, 0x3b, 0x17, 0xd5, 0x79, 0x04, 0x15, 0x1e, 0x23, 0x31,
0xdc, 0xec, 0xe7, 0x2f, 0x13, 0x1c, 0x3c, 0x40, 0x9c, 0x07, 0xfd, 0x08, 0x9a, 0xd4, 0x0b, 0x48,
0x42, 0x9d, 0x20, 0xe2, 0x41, 0x55, 0xf0, 0x9a, 0xf0, 0x9d, 0x25, 0xa6, 0x81, 0xc2, 0x7a, 0x87,
0x08, 0x13, 0x5b, 0xa2, 0x9f, 0x42, 0x93, 0x61, 0x8a, 0x0f, 0x5c, 0xed, 0x2a, 0x07, 0xe9, 0xee,
0x1d, 0x44, 0xf1, 0x6b, 0x71, 0x23, 0x4e, 0x51, 0xfa, 0x0b, 0x50, 0x39, 0x0a, 0xa4, 0x90, 0x68,
0x5a, 0xfb, 0xc5, 0xa6, 0x95, 0xa2, 0x0d, 0xc3, 0xba, 0xcf, 0xa3, 0xc7, 0x50, 0xbd, 0xe1, 0x26,
0xd5, 0xe5, 0xe0, 0x97, 0x77, 0x8e, 0x87, 0x5f, 0x9c, 0xb3, 0x57, 0xf5, 0x37, 0xa2, 0x9a, 0x78,
0xbb, 0xba, 0xf3, 0xaa, 0xca, 0x42, 0xc3, 0x29, 0x0f, 0xf7, 0x2a, 0xf0, 0x79, 0xc7, 0x62, 0x5e,
0x05, 0x3e, 0xfa, 0x04, 0x36, 0xdc, 0x65, 0x1c, 0xf3, 0x51, 0xd3, 0x0b, 0x48, 0x7b, 0x97, 0x07,
0x47, 0x95, 0x34, 0xcb, 0x0b, 0x08, 0xfa, 0x25, 0xb4, 0x7c, 0x27, 0xa1, 0x0c, 0x6c, 0xd2, 0x91,
0x3d, 0x7e, 0x55, 0x01, 0x71, 0x02, 0x6c, 0xc2, 0x13, 0xd5, 0x5f, 0x6f, 0xf4, 0x2b, 0xd8, 0x18,
0x7b, 0x0b, 0x2f, 0x70, 0x7c, 0x0e, 0x4a, 0x16, 0xec, 0x5c, 0x3b, 0xe1, 0xeb, 0xfb, 0x75, 0x12,
0xf4, 0x31, 0xa8, 0xcc, 0x04, 0x37, 0xf4, 0x97, 0xc1, 0x42, 0x54, 0xb8, 0x82, 0x9b, 0xd1, 0xe9,
0x40, 0x10, 0x18, 0x3a, 0xe5, 0x4d, 0x33, 0xf7, 0x8a, 0x04, 0x0e, 0xfa, 0x3c, 0x43, 0x83, 0x40,
0x78, 0xbb, 0x88, 0xa3, 0xb5, 0x51, 0x29, 0x4e, 0xf4, 0xdf, 0x95, 0xa1, 0x75, 0x2e, 0xe6, 0x8e,
0x74, 0xd6, 0xf9, 0x0a, 0x76, 0xc8, 0xc5, 0x05, 0x71, 0xa9, 0x77, 0x43, 0x6c, 0xd7, 0xf1, 0x7d,
0x12, 0xdb, 0x12, 0xb5, 0x6a, 0x77, 0xab, 0x23, 0xbe, 0x3f, 0x06, 0x9c, 0x3e, 0x1a, 0xe2, 0xed,
0x8c, 0x57, 0x92, 0xe6, 0xc8, 0x80, 0x1d, 0x2f, 0x08, 0xc8, 0xdc, 0x73, 0x68, 0x5e, 0x81, 0x68,
0xf3, 0x7b, 0xd2, 0xd3, 0x73, 0xeb, 0xc4, 0xa1, 0x64, 0xad, 0x26, 0x93, 0xc8, 0xd4, 0x3c, 0x62,
0xce, 0xc4, 0x97, 0xd9, 0xf8, 0xb4, 0x29, 0x25, 0x2d, 0x4e, 0xc4, 0xf2, 0xb0, 0x30, 0x9a, 0x55,
0xee, 0x8c, 0x66, 0xeb, 0xe7, 0xb3, 0xfa, 0xbe, 0xe7, 0x53, 0xff, 0x12, 0xb6, 0xb2, 0x40, 0xc8,
0xd1, 0xeb, 0x08, 0x6a, 0x3c, 0xfd, 0x69, 0x38, 0xd1, 0xdb, 0x90, 0xc3, 0x92, 0x43, 0xff, 0x6d,
0x19, 0x50, 0x2a, 0x1f, 0xde, 0x26, 0xff, 0xa3, 0xc1, 0xdc, 0x85, 0x2a, 0xa7, 0xcb, 0x48, 0x8a,
0x0d, 0x8b, 0x03, 0x2b, 0xf0, 0xe8, 0x3a, 0x0b, 0xa3, 0x10, 0x7e, 0xc1, 0x7e, 0x31, 0x49, 0x96,
0x3e, 0xc5, 0x92, 0x43, 0xff, 0x6b, 0x09, 0x76, 0x0a, 0x71, 0x90, 0xb1, 0x5c, 0x57, 0x7c, 0xe9,
0x1d, 0x15, 0x7f, 0x08, 0x8d, 0xe8, 0xfa, 0x1d, 0xc8, 0xc8, 0x4e, 0xbf, 0xb3, 0x85, 0x7d, 0x0c,
0x95, 0x98, 0xb5, 0x52, 0xf1, 0x3e, 0xe6, 0x07, 0x0a, 0x4e, 0x67, 0x53, 0x49, 0xc1, 0x8f, 0xc2,
0x54, 0x22, 0xed, 0xf7, 0x40, 0xcd, 0x21, 0x9b, 0xb5, 0x02, 0xf1, 0xf0, 0xa6, 0x0d, 0x41, 0xa6,
0xee, 0x3f, 0x3e, 0xbe, 0x62, 0x5e, 0x14, 0x1b, 0xd6, 0x85, 0xdd, 0x30, 0x88, 0x7c, 0x42, 0x89,
0x48, 0x59, 0x03, 0xaf, 0x09, 0xfa, 0xd7, 0xa0, 0xe6, 0x24, 0xdf, 0x37, 0x7c, 0xac, 0x93, 0xa0,
0xbc, 0x37, 0x09, 0x7f, 0x2f, 0xc1, 0xde, 0xba, 0x98, 0x97, 0x3e, 0xfd, 0xbf, 0xaa, 0x47, 0x3d,
0x86, 0xfd, 0xbb, 0xde, 0x7d, 0x50, 0x95, 0x7d, 0x8f, 0xda, 0x39, 0xfa, 0x15, 0xa8, 0xb9, 0x19,
0x9a, 0x7d, 0x6a, 0x8f, 0x4e, 0x26, 0x53, 0x6c, 0x68, 0x0f, 0x50, 0x03, 0x2a, 0x33, 0x6b, 0x7a,
0xa6, 0x95, 0xd8, 0xca, 0xf8, 0xda, 0x18, 0x88, 0xcf, 0x77, 0xb6, 0xb2, 0x25, 0x93, 0x72, 0xf4,
0xcf, 0x12, 0xc0, 0xfa, 0x95, 0x46, 0x2a, 0xd4, 0x5f, 0x4e, 0x4e, 0x27, 0xd3, 0x57, 0x13, 0xa1,
0xe0, 0xc4, 0x1a, 0x0d, 0xb5, 0x12, 0x6a, 0x42, 0x55, 0xfc, 0x1f, 0x50, 0x66, 0x37, 0xc8, 0x3f,
0x03, 0x14, 0xb4, 0x01, 0x8d, 0xec, 0x9f, 0x80, 0x0a, 0xaa, 0x83, 0x92, 0x7d, 0xef, 0xcb, 0x0f,
0xfc, 0x1a, 0x53, 0x88, 0x8d, 0x33, 0xb3, 0x37, 0x30, 0xb4, 0x3a, 0x3b, 0xc8, 0x3e, 0xf5, 0x01,
0x6a, 0xe9, 0x77, 0x3e, 0x93, 0x9c, 0x19, 0x96, 0x06, 0xec, 0x9e, 0xa9, 0xf5, 0xcc, 0xc0, 0x9a,
0xca, 0x68, 0x78, 0xfa, 0x4a, 0xdb, 0x60, 0xb4, 0xa7, 0x23, 0xc3, 0x1c, 0x6a, 0x9b, 0x68, 0x13,
0x9a, 0xcf, 0x8c, 0x1e, 0xb6, 0xfa, 0x46, 0xcf, 0xd2, 0x5a, 0xec, 0xe4, 0x9c, 0x1b, 0xb8, 0xc5,
0xae, 0x79, 0x3e, 0x7d, 0x89, 0x27, 0x3d, 0x53, 0xd3, 0xd8, 0xe6, 0xdc, 0xc0, 0xb3, 0xd1, 0x74,
0xa2, 0x6d, 0xb3, 0x7b, 0xcc, 0xde, 0xcc, 0x3a, 0x3b, 0xd5, 0xd0, 0xd1, 0x63, 0xf6, 0x30, 0xe5,
0xc7, 0x34, 0x80, 0x9a, 0xd5, 0xeb, 0x9b, 0xc6, 0x4c, 0x7b, 0xc0, 0xd6, 0xb3, 0x67, 0x3d, 0x3c,
0x9c, 0x69, 0xa5, 0xfe, 0x67, 0xdf, 0x3c, 0xbe, 0xf1, 0x28, 0x49, 0x92, 0x8e, 0x17, 0x1e, 0x8b,
0xd5, 0xf1, 0x65, 0x78, 0x7c, 0x43, 0x8f, 0xf9, 0x7f, 0x58, 0xc7, 0x6b, 0x90, 0xbd, 0xae, 0x71,
0xca, 0xcf, 0xfe, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x74, 0x10, 0xf9, 0x24, 0x1f, 0x13, 0x00, 0x00,
// 1901 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x58, 0x4b, 0x73, 0xe3, 0xc6,
0x11, 0x5e, 0xbe, 0xc9, 0x06, 0x45, 0x41, 0xa3, 0x47, 0x98, 0xad, 0xd8, 0x25, 0xa3, 0x62, 0xaf,
0xac, 0xaa, 0x50, 0x0e, 0x13, 0x6f, 0x2e, 0xb1, 0x1d, 0x3e, 0xb0, 0x5a, 0xae, 0xf8, 0xd0, 0x0e,
0xb1, 0x5a, 0x97, 0x2f, 0x28, 0x08, 0x1c, 0x49, 0x88, 0x00, 0x02, 0x0b, 0x0c, 0x25, 0xf3, 0x07,
0xa4, 0x2a, 0xf7, 0xfc, 0x8a, 0x9c, 0x72, 0xc8, 0x35, 0xb9, 0x26, 0x7f, 0x22, 0xd7, 0x5c, 0x92,
0x3f, 0x91, 0x9a, 0x07, 0x5e, 0x5a, 0x7b, 0xa5, 0x75, 0x55, 0x0e, 0xc9, 0x85, 0x35, 0xd3, 0xd3,
0xdd, 0xd3, 0xaf, 0xaf, 0xd1, 0x1c, 0x50, 0xcf, 0x9d, 0xa5, 0xeb, 0x5f, 0x2e, 0x2c, 0x6a, 0x75,
0x82, 0xd0, 0xa7, 0x3e, 0x82, 0x94, 0xf2, 0x58, 0xb9, 0xa1, 0x61, 0x60, 0x8b, 0x83, 0xc7, 0xca,
0x9b, 0x15, 0x09, 0xd7, 0x72, 0xd3, 0xa2, 0x7e, 0xe0, 0xa7, 0x52, 0xda, 0x04, 0x6a, 0x83, 0x2b,
0x2b, 0x8c, 0x08, 0x45, 0x7b, 0x50, 0xb5, 0x5d, 0x87, 0x2c, 0x69, 0xbb, 0xb0, 0x5f, 0x38, 0xa8,
0x60, 0xb9, 0x43, 0x08, 0xca, 0xb6, 0xbf, 0x5c, 0xb6, 0x8b, 0x9c, 0xca, 0xd7, 0x8c, 0x37, 0x22,
0xe1, 0x0d, 0x09, 0xdb, 0x25, 0xc1, 0x2b, 0x76, 0xda, 0x3f, 0x4b, 0xb0, 0xd5, 0xe7, 0x76, 0x18,
0xa1, 0xb5, 0x8c, 0x2c, 0x9b, 0x3a, 0xfe, 0x12, 0x1d, 0x03, 0x44, 0xd4, 0xa2, 0xc4, 0x23, 0x4b,
0x1a, 0xb5, 0x0b, 0xfb, 0xa5, 0x03, 0xa5, 0xfb, 0xa4, 0x93, 0xf1, 0xe0, 0x2d, 0x91, 0xce, 0x3c,
0xe6, 0xc7, 0x19, 0x51, 0xd4, 0x05, 0x85, 0xdc, 0x90, 0x25, 0x35, 0xa9, 0x7f, 0x4d, 0x96, 0xed,
0xf2, 0x7e, 0xe1, 0x40, 0xe9, 0x6e, 0x75, 0x84, 0x83, 0x3a, 0x3b, 0x31, 0xd8, 0x01, 0x06, 0x92,
0xac, 0x1f, 0xff, 0xad, 0x08, 0x8d, 0x44, 0x1b, 0x1a, 0x43, 0xdd, 0xb6, 0x28, 0xb9, 0xf4, 0xc3,
0x35, 0x77, 0xb3, 0xd5, 0xfd, 0xec, 0x81, 0x86, 0x74, 0x06, 0x52, 0x0e, 0x27, 0x1a, 0xd0, 0xcf,
0xa0, 0x66, 0x8b, 0xe8, 0xf1, 0xe8, 0x28, 0xdd, 0xed, 0xac, 0x32, 0x19, 0x58, 0x1c, 0xf3, 0x20,
0x15, 0x4a, 0xd1, 0x1b, 0x97, 0x87, 0xac, 0x89, 0xd9, 0x52, 0xfb, 0x63, 0x01, 0xea, 0xb1, 0x5e,
0xb4, 0x0d, 0x9b, 0xfd, 0xb1, 0xf9, 0x6a, 0x8a, 0xf5, 0xc1, 0xec, 0x78, 0x3a, 0xfa, 0x46, 0x1f,
0xaa, 0x8f, 0x50, 0x13, 0xea, 0xfd, 0xb1, 0xd9, 0xd7, 0x8f, 0x47, 0x53, 0xb5, 0x80, 0x36, 0xa0,
0xd1, 0x1f, 0x9b, 0x83, 0xd9, 0x64, 0x32, 0x32, 0xd4, 0x22, 0xda, 0x04, 0xa5, 0x3f, 0x36, 0xf1,
0x6c, 0x3c, 0xee, 0xf7, 0x06, 0x27, 0x6a, 0x09, 0xed, 0xc2, 0x56, 0x7f, 0x6c, 0x0e, 0x27, 0x63,
0x73, 0xa8, 0x9f, 0x62, 0x7d, 0xd0, 0x33, 0xf4, 0xa1, 0x5a, 0x46, 0x00, 0x55, 0x46, 0x1e, 0x8e,
0xd5, 0x8a, 0x5c, 0xcf, 0x75, 0x43, 0xad, 0x4a, 0x75, 0xa3, 0xe9, 0x5c, 0xc7, 0x86, 0x5a, 0x93,
0xdb, 0x57, 0xa7, 0xc3, 0x9e, 0xa1, 0xab, 0x75, 0xb9, 0x1d, 0xea, 0x63, 0xdd, 0xd0, 0xd5, 0xc6,
0x8b, 0x72, 0xbd, 0xa8, 0x96, 0x5e, 0x94, 0xeb, 0x25, 0xb5, 0xac, 0xfd, 0xa1, 0x00, 0xbb, 0x73,
0x1a, 0x12, 0xcb, 0x3b, 0x21, 0x6b, 0x6c, 0x2d, 0x2f, 0x09, 0x26, 0x6f, 0x56, 0x24, 0xa2, 0xe8,
0x31, 0xd4, 0x03, 0x3f, 0x72, 0x58, 0xec, 0x78, 0x80, 0x1b, 0x38, 0xd9, 0xa3, 0x23, 0x68, 0x5c,
0x93, 0xb5, 0x19, 0x32, 0x7e, 0x19, 0x30, 0xd4, 0x49, 0x0a, 0x32, 0xd1, 0x54, 0xbf, 0x96, 0xab,
0x6c, 0x7c, 0x4b, 0xf7, 0xc7, 0x57, 0xbb, 0x80, 0xbd, 0xbb, 0x46, 0x45, 0x81, 0xbf, 0x8c, 0x08,
0x1a, 0x03, 0x12, 0x82, 0x26, 0x4d, 0x73, 0xcb, 0xed, 0x53, 0xba, 0x1f, 0xbc, 0xb3, 0x00, 0xf0,
0xd6, 0xf9, 0x5d, 0x92, 0xf6, 0x2d, 0x6c, 0x8b, 0x7b, 0x0c, 0xeb, 0xdc, 0x25, 0xd1, 0x43, 0x5c,
0xdf, 0x83, 0x2a, 0xe5, 0xcc, 0xed, 0xe2, 0x7e, 0xe9, 0xa0, 0x81, 0xe5, 0xee, 0x7d, 0x3d, 0x5c,
0xc0, 0x4e, 0xfe, 0xe6, 0xff, 0x8a, 0x7f, 0xbf, 0x84, 0x32, 0x5e, 0xb9, 0x04, 0xed, 0x40, 0xc5,
0xb3, 0xa8, 0x7d, 0x25, 0xbd, 0x11, 0x1b, 0xe6, 0xca, 0x85, 0xe3, 0x52, 0x12, 0xf2, 0x14, 0x36,
0xb0, 0xdc, 0x69, 0x7f, 0x2e, 0x40, 0xf5, 0x19, 0x5f, 0xa2, 0x4f, 0xa0, 0x12, 0xae, 0x98, 0xb3,
0x02, 0xeb, 0x6a, 0xd6, 0x02, 0xa6, 0x19, 0x8b, 0x63, 0x34, 0x82, 0xd6, 0x85, 0x43, 0xdc, 0x05,
0x87, 0xee, 0xc4, 0x5f, 0x88, 0xaa, 0x68, 0x75, 0x3f, 0xca, 0x0a, 0x08, 0x9d, 0x9d, 0x67, 0x39,
0x46, 0x7c, 0x47, 0x50, 0x7b, 0x0a, 0xad, 0x3c, 0x07, 0x83, 0x93, 0x8e, 0xb1, 0x39, 0x9b, 0x9a,
0x93, 0xd1, 0x7c, 0xd2, 0x33, 0x06, 0xcf, 0xd5, 0x47, 0x1c, 0x31, 0xfa, 0xdc, 0x30, 0xf5, 0x67,
0xcf, 0x66, 0xd8, 0x50, 0x0b, 0xda, 0xbf, 0x8b, 0xd0, 0x14, 0x41, 0x99, 0xfb, 0xab, 0xd0, 0x26,
0x2c, 0x8b, 0xd7, 0x64, 0x1d, 0x05, 0x96, 0x4d, 0xe2, 0x2c, 0xc6, 0x7b, 0x16, 0x90, 0xe8, 0xca,
0x0a, 0x17, 0xd2, 0x73, 0xb1, 0x41, 0x9f, 0x83, 0xc2, 0xb3, 0x49, 0x4d, 0xba, 0x0e, 0x08, 0xcf,
0x63, 0xab, 0xbb, 0x93, 0x16, 0x36, 0xcf, 0x15, 0x35, 0xd6, 0x01, 0xc1, 0x40, 0x93, 0x75, 0x1e,
0x0d, 0xe5, 0x07, 0xa0, 0x21, 0xad, 0xa1, 0x4a, 0xae, 0x86, 0x0e, 0x93, 0x84, 0x54, 0xa5, 0x96,
0xb7, 0xa2, 0x17, 0x27, 0x09, 0x75, 0xa0, 0xea, 0x2f, 0xcd, 0xc5, 0xc2, 0x6d, 0xd7, 0xb8, 0x99,
0x3f, 0xca, 0xf2, 0xce, 0x96, 0xc3, 0xe1, 0xb8, 0x27, 0xca, 0xa2, 0xe2, 0x2f, 0x87, 0x0b, 0x17,
0x7d, 0x0c, 0x2d, 0xf2, 0x2d, 0x25, 0xe1, 0xd2, 0x72, 0x4d, 0x6f, 0xcd, 0xba, 0x57, 0x9d, 0xbb,
0xbe, 0x11, 0x53, 0x27, 0x8c, 0x88, 0x3e, 0x81, 0xcd, 0x88, 0xfa, 0x81, 0x69, 0x5d, 0x50, 0x12,
0x9a, 0xb6, 0x1f, 0xac, 0xdb, 0x8d, 0xfd, 0xc2, 0x41, 0x1d, 0x6f, 0x30, 0x72, 0x8f, 0x51, 0x07,
0x7e, 0xb0, 0xd6, 0x5e, 0x42, 0x03, 0xfb, 0xb7, 0x83, 0x2b, 0xee, 0x8f, 0x06, 0xd5, 0x73, 0x72,
0xe1, 0x87, 0x44, 0x16, 0x2a, 0xc8, 0x46, 0x8e, 0xfd, 0x5b, 0x2c, 0x4f, 0xd0, 0x3e, 0x54, 0xb8,
0x4e, 0xd9, 0x2e, 0xb2, 0x2c, 0xe2, 0x40, 0xb3, 0xa0, 0x8e, 0xfd, 0x5b, 0x9e, 0x76, 0xf4, 0x01,
0x88, 0x00, 0x9b, 0x4b, 0xcb, 0x8b, 0xb3, 0xd7, 0xe0, 0x94, 0xa9, 0xe5, 0x11, 0xf4, 0x14, 0x94,
0xd0, 0xbf, 0x35, 0x6d, 0x7e, 0xbd, 0x40, 0xa2, 0xd2, 0xdd, 0xcd, 0x15, 0x67, 0x6c, 0x1c, 0x86,
0x30, 0x5e, 0x46, 0xda, 0x4b, 0x80, 0xb4, 0xb6, 0xee, 0xbb, 0xe4, 0xa7, 0x2c, 0x1b, 0xc4, 0x5d,
0xc4, 0xfa, 0x9b, 0xd2, 0x64, 0xae, 0x01, 0xcb, 0x33, 0xed, 0xf7, 0x05, 0x68, 0xcc, 0x59, 0xf5,
0x1c, 0x53, 0x67, 0xf1, 0x03, 0x6a, 0x0e, 0x41, 0xf9, 0x92, 0x3a, 0x0b, 0x5e, 0x6c, 0x0d, 0xcc,
0xd7, 0xe8, 0xf3, 0xd8, 0xb0, 0xc0, 0xbc, 0x8e, 0xda, 0x65, 0x7e, 0x7b, 0x2e, 0xbf, 0xbc, 0x10,
0xc7, 0x56, 0x44, 0x4f, 0x4f, 0x70, 0x9d, 0xb3, 0x9e, 0x9e, 0x44, 0xda, 0x57, 0x50, 0x39, 0xe3,
0x56, 0x3c, 0x05, 0x85, 0x2b, 0x37, 0x99, 0xb6, 0x18, 0xbb, 0xb9, 0xf0, 0x24, 0x16, 0x63, 0x88,
0xe2, 0x65, 0xa4, 0xf5, 0x60, 0xe3, 0x44, 0x5a, 0xcb, 0x19, 0xde, 0xdf, 0x1d, 0xed, 0x2f, 0x45,
0xa8, 0xbd, 0xf0, 0x57, 0xac, 0xa0, 0x50, 0x0b, 0x8a, 0xce, 0x82, 0xcb, 0x95, 0x70, 0xd1, 0x59,
0xa0, 0xdf, 0x40, 0xcb, 0x73, 0x2e, 0x43, 0x8b, 0x95, 0xa5, 0x40, 0x98, 0x68, 0x12, 0x3f, 0xce,
0x5a, 0x36, 0x89, 0x39, 0x38, 0xcc, 0x36, 0xbc, 0xec, 0x36, 0x03, 0x9c, 0x52, 0x0e, 0x38, 0x1f,
0x43, 0xcb, 0xf5, 0x6d, 0xcb, 0x35, 0x93, 0xb6, 0x5d, 0x16, 0xc5, 0xcd, 0xa9, 0xa7, 0x71, 0xef,
0xbe, 0x13, 0x97, 0xca, 0x03, 0xe3, 0x82, 0xbe, 0x80, 0x66, 0x60, 0x85, 0xd4, 0xb1, 0x9d, 0xc0,
0x62, 0x83, 0x4f, 0x95, 0x0b, 0xe6, 0xcc, 0xce, 0xc5, 0x0d, 0xe7, 0xd8, 0xd1, 0xa7, 0xa0, 0x46,
0xbc, 0x25, 0x99, 0xb7, 0x7e, 0x78, 0x7d, 0xe1, 0xfa, 0xb7, 0x51, 0xbb, 0xc6, 0xed, 0xdf, 0x14,
0xf4, 0xd7, 0x31, 0x59, 0xfb, 0x53, 0x09, 0xaa, 0x67, 0xa2, 0x3a, 0x0f, 0xa1, 0xcc, 0x63, 0x24,
0x86, 0x9b, 0xbd, 0xec, 0x65, 0x82, 0x83, 0x07, 0x88, 0xf3, 0xa0, 0x9f, 0x40, 0x83, 0x3a, 0x1e,
0x89, 0xa8, 0xe5, 0x05, 0x3c, 0xa8, 0x25, 0x9c, 0x12, 0xbe, 0xb3, 0xc4, 0x54, 0x28, 0xb1, 0xde,
0x21, 0xc2, 0xc4, 0x96, 0xe8, 0xe7, 0xd0, 0x60, 0x98, 0xe2, 0x03, 0x57, 0xbb, 0xc2, 0x41, 0xba,
0x73, 0x07, 0x51, 0xfc, 0x5a, 0x5c, 0x0f, 0x63, 0x94, 0xfe, 0x0a, 0x14, 0x8e, 0x02, 0x29, 0x24,
0x9a, 0xd6, 0x5e, 0xbe, 0x69, 0xc5, 0x68, 0xc3, 0x90, 0xf6, 0x79, 0xf4, 0x04, 0x2a, 0x37, 0xdc,
0xa4, 0x9a, 0x1c, 0xfc, 0xb2, 0xce, 0xf1, 0xf0, 0x8b, 0x73, 0xf6, 0x55, 0xfd, 0xad, 0xa8, 0x26,
0xde, 0xae, 0xee, 0x7c, 0x55, 0x65, 0xa1, 0xe1, 0x98, 0x87, 0x7b, 0xe5, 0xb9, 0xbc, 0x63, 0x31,
0xaf, 0x3c, 0x17, 0x7d, 0x04, 0x4d, 0x7b, 0x15, 0x86, 0x7c, 0xd4, 0x74, 0x3c, 0xd2, 0xde, 0xe1,
0xc1, 0x51, 0x24, 0xcd, 0x70, 0x3c, 0x82, 0x7e, 0x0d, 0x2d, 0xd7, 0x8a, 0x28, 0x03, 0x9b, 0x74,
0x64, 0x97, 0x5f, 0x95, 0x43, 0x9c, 0x00, 0x9b, 0xf0, 0x44, 0x71, 0xd3, 0x8d, 0x76, 0x05, 0xcd,
0x89, 0xb3, 0x74, 0x3c, 0xcb, 0xe5, 0xa0, 0x64, 0xc1, 0xce, 0xb4, 0x13, 0xbe, 0x7e, 0x58, 0x27,
0x41, 0x1f, 0x82, 0xc2, 0x4c, 0xb0, 0x7d, 0x77, 0xe5, 0x2d, 0x45, 0x85, 0x97, 0x70, 0x23, 0x38,
0x19, 0x08, 0x02, 0x43, 0xa7, 0xbc, 0x69, 0x6e, 0x5f, 0x11, 0xcf, 0x42, 0x9f, 0x25, 0x68, 0x10,
0x08, 0x6f, 0xe7, 0x71, 0x94, 0x1a, 0x15, 0xe3, 0x44, 0xfb, 0x7b, 0x11, 0x5a, 0x67, 0x62, 0xee,
0x88, 0x67, 0x9d, 0xaf, 0x60, 0x9b, 0x5c, 0x5c, 0x10, 0x9b, 0x3a, 0x37, 0xc4, 0xb4, 0x2d, 0xd7,
0x25, 0xa1, 0x29, 0x51, 0xab, 0x74, 0x37, 0x3b, 0xe2, 0xff, 0xc7, 0x80, 0xd3, 0x47, 0x43, 0xbc,
0x95, 0xf0, 0x4a, 0xd2, 0x02, 0xe9, 0xb0, 0xed, 0x78, 0x1e, 0x59, 0x38, 0x16, 0xcd, 0x2a, 0x10,
0x6d, 0x7e, 0x57, 0x7a, 0x7a, 0x66, 0x1c, 0x5b, 0x94, 0xa4, 0x6a, 0x12, 0x89, 0x44, 0xcd, 0xc7,
0xcc, 0x99, 0xf0, 0x32, 0x19, 0x9f, 0x36, 0xa4, 0xa4, 0xc1, 0x89, 0x58, 0x1e, 0xe6, 0x46, 0xb3,
0xf2, 0x9d, 0xd1, 0x2c, 0xfd, 0x7c, 0x56, 0xee, 0xfd, 0x7c, 0x7e, 0x09, 0x9b, 0xa2, 0xc5, 0xc6,
0xa9, 0x8f, 0x51, 0xfd, 0xbd, 0x7d, 0xb6, 0x49, 0xd3, 0x4d, 0xa4, 0x7d, 0x01, 0x9b, 0x49, 0x20,
0xe5, 0xe8, 0x76, 0x08, 0x55, 0x5e, 0x3e, 0x71, 0x3a, 0xd0, 0xdb, 0x90, 0xc5, 0x92, 0x43, 0xfb,
0x5d, 0x11, 0x50, 0x2c, 0xef, 0xdf, 0x46, 0xff, 0xa3, 0xc9, 0xd8, 0x81, 0x0a, 0xa7, 0xcb, 0x4c,
0x88, 0x0d, 0x8b, 0x03, 0x0b, 0x6a, 0x70, 0x9d, 0xa4, 0x41, 0x08, 0xbf, 0x64, 0xbf, 0x98, 0x44,
0x2b, 0x97, 0x62, 0xc9, 0xa1, 0xfd, 0xb5, 0x00, 0xdb, 0xb9, 0x38, 0xc8, 0x58, 0xa6, 0x88, 0x29,
0xbc, 0x03, 0x31, 0x07, 0x50, 0x0f, 0xae, 0xdf, 0x81, 0xac, 0xe4, 0xf4, 0x3b, 0x5b, 0xe0, 0x87,
0x50, 0x0e, 0x59, 0x2b, 0x16, 0xdf, 0xd7, 0xec, 0x40, 0xc2, 0xe9, 0x6c, 0xaa, 0xc9, 0xf9, 0x91,
0x9b, 0x6a, 0xa4, 0xfd, 0x0e, 0x28, 0x99, 0xce, 0xc0, 0x5a, 0x49, 0xbe, 0xaa, 0x64, 0xea, 0xbe,
0xb7, 0xa8, 0x94, 0x4c, 0x51, 0xb1, 0x2e, 0x6e, 0xfb, 0x5e, 0xe0, 0x12, 0x4a, 0x44, 0xca, 0xea,
0x38, 0x25, 0x68, 0x5f, 0x83, 0x92, 0x91, 0xbc, 0x6f, 0x78, 0x49, 0x93, 0x50, 0xba, 0x37, 0x09,
0xff, 0x28, 0xc0, 0x6e, 0x5a, 0xcc, 0x2b, 0x97, 0xfe, 0x5f, 0xd5, 0xa3, 0x16, 0xc2, 0xde, 0x5d,
0xef, 0xde, 0xab, 0xca, 0x7e, 0x40, 0xed, 0x1c, 0x7e, 0x09, 0x4a, 0x66, 0x06, 0x67, 0x7f, 0xd5,
0x47, 0xc7, 0xd3, 0x19, 0xd6, 0xd5, 0x47, 0xa8, 0x0e, 0xe5, 0xb9, 0x31, 0x3b, 0x55, 0x0b, 0x6c,
0xa5, 0x7f, 0xad, 0x0f, 0xc4, 0xdf, 0x7f, 0xb6, 0x32, 0x25, 0x53, 0xe9, 0xf0, 0x5f, 0x05, 0x80,
0xf4, 0x2b, 0x8f, 0x14, 0xa8, 0xbd, 0x9a, 0x9e, 0x4c, 0x67, 0xaf, 0xa7, 0x42, 0xc1, 0xb1, 0x31,
0x1a, 0xaa, 0x05, 0xd4, 0x80, 0x8a, 0x78, 0x4f, 0x28, 0xb2, 0x1b, 0xe4, 0x63, 0x42, 0x09, 0x35,
0xa1, 0x9e, 0xbc, 0x24, 0x94, 0x51, 0x0d, 0x4a, 0xc9, 0x7b, 0x81, 0x7c, 0x20, 0xa8, 0x32, 0x85,
0x58, 0x3f, 0x1d, 0xf7, 0x06, 0xba, 0x5a, 0x63, 0x07, 0xc9, 0x53, 0x01, 0x40, 0x35, 0x7e, 0x27,
0x60, 0x92, 0x73, 0xdd, 0x50, 0x81, 0xdd, 0x33, 0x33, 0x9e, 0xeb, 0x58, 0x55, 0x18, 0x0d, 0xcf,
0x5e, 0xab, 0x4d, 0x46, 0x7b, 0x36, 0xd2, 0xc7, 0x43, 0x75, 0x03, 0x6d, 0x40, 0xe3, 0xb9, 0xde,
0xc3, 0x46, 0x5f, 0xef, 0x19, 0x6a, 0x8b, 0x9d, 0x9c, 0x71, 0x03, 0x37, 0xd9, 0x35, 0x2f, 0x66,
0xaf, 0xf0, 0xb4, 0x37, 0x56, 0x55, 0xb6, 0x39, 0xd3, 0xf1, 0x7c, 0x34, 0x9b, 0xaa, 0x5b, 0xec,
0x9e, 0x71, 0x6f, 0x6e, 0x9c, 0x9e, 0xa8, 0xe8, 0xf0, 0x09, 0xfb, 0xb0, 0x65, 0xc7, 0x3c, 0x80,
0xaa, 0xd1, 0xeb, 0x8f, 0xf5, 0xb9, 0xfa, 0x88, 0xad, 0xe7, 0xcf, 0x7b, 0x78, 0x38, 0x57, 0x0b,
0xfd, 0x4f, 0xbf, 0x79, 0x72, 0xe3, 0x50, 0x12, 0x45, 0x1d, 0xc7, 0x3f, 0x12, 0xab, 0xa3, 0x4b,
0xff, 0xe8, 0x86, 0x1e, 0xf1, 0x37, 0xb0, 0xa3, 0x14, 0x64, 0xe7, 0x55, 0x4e, 0xf9, 0xc5, 0x7f,
0x02, 0x00, 0x00, 0xff, 0xff, 0xb6, 0x0c, 0x0f, 0x53, 0x5f, 0x13, 0x00, 0x00,
}

Просмотреть файл

@ -462,8 +462,8 @@ func (itc *internalTabletConn) StreamHealth(ctx context.Context, callback func(*
}
// VStream is part of queryservice.QueryService.
func (itc *internalTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
err := itc.tablet.qsc.QueryService().VStream(ctx, target, startPos, filter, send)
func (itc *internalTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
err := itc.tablet.qsc.QueryService().VStream(ctx, target, startPos, tableLastPKs, filter, send)
return tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}

Просмотреть файл

@ -23,6 +23,8 @@ import (
"os"
"testing"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttest"
@ -207,9 +209,13 @@ func TestMain(m *testing.M) {
defer os.RemoveAll(cfg.SchemaDir)
cfg.TabletHostName = *tabletHostName
env, err := vttest.NewLocalTestEnvWithDirectory("", 9000, "/tmp/vttest")
if err != nil {
log.Errorf("err is %v", err)
}
cluster = &vttest.LocalCluster{
Config: cfg,
Env: env,
}
if err := cluster.Setup(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
@ -217,7 +223,6 @@ func TestMain(m *testing.M) {
return 1
}
defer cluster.TearDown()
vtParams = mysql.ConnParams{
Host: "localhost",
Port: cluster.Env.PortForProtocol("vtcombo_mysql_port", ""),

Просмотреть файл

@ -19,10 +19,14 @@ package endtoend
import (
"context"
"fmt"
"io"
"testing"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
@ -30,25 +34,32 @@ import (
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)
func TestVStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func initialize(ctx context.Context, t *testing.T) (*vtgateconn.VTGateConn, *mysql.Conn, *mysql.Conn, func()) {
gconn, err := vtgateconn.Dial(ctx, grpcAddress)
if err != nil {
t.Fatal(err)
}
defer gconn.Close()
conn, err := mysql.Connect(ctx, &vtParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
mconn, err := mysql.Connect(ctx, &mysqlParams)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
close := func() {
gconn.Close()
conn.Close()
mconn.Close()
}
return gconn, conn, mconn, close
}
func TestVStream(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()
mpos, err := mconn.MasterPosition()
if err != nil {
@ -128,3 +139,88 @@ func TestVStream(t *testing.T) {
}
cancel()
}
func TestVStreamCopyNoPos(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "ks",
Shard: "-80",
Gtid: "",
}},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*/",
}},
}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
_, _ = conn, mconn
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)
events, err := reader.Recv()
require.Errorf(t, err, "Stream needs a position or a table to copy")
require.Nil(t, events)
cancel()
}
func TestVStreamCopyBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gconn, conn, mconn, closeConnections := initialize(ctx, t)
defer closeConnections()
_, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
if err != nil {
t.Fatal(err)
}
lastPK := sqltypes.Result{
Fields: []*query.Field{{Name: "id1", Type: query.Type_INT32}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}},
}
qr := sqltypes.ResultToProto3(&lastPK)
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "ks",
Shard: "-80",
Gtid: "",
TablePKs: []*binlogdatapb.TableLastPK{{
TableName: "t1",
Lastpk: qr,
},
},
}},
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
_, _ = conn, mconn
if err != nil {
t.Fatal(err)
}
require.NotNil(t, reader)
for {
e, err := reader.Recv()
switch err {
case nil:
log.Infof("Received Events\n%v\n", e)
case io.EOF:
log.Infof("stream ended\n")
cancel()
default:
t.Fatalf("remote error: %v\n", err)
}
}
}

Просмотреть файл

@ -1,8 +1,7 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
@ -150,6 +149,8 @@ func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodat
newvgtid.ShardGtids = append(newvgtid.ShardGtids, sgtid)
}
}
//TODO add tablepk validations
return newvgtid, filter, nil
}
@ -212,7 +213,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss)
}
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, vs.filter, func(events []*binlogdatapb.VEvent) error {
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0
@ -250,6 +251,16 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
eventss = nil
sendevents = nil
case binlogdatapb.VEventType_LASTPK:
// don't send lastpk, sent as part of vgtid
if len(eventss) > 0 {
if err := vs.sendAll(sgtid, eventss); err != nil {
return err
}
eventss = nil
sendevents = nil
}
case binlogdatapb.VEventType_HEARTBEAT:
// Remove all heartbeat events for now.
// Otherwise they can accumulate indefinitely if there are no real events.

Просмотреть файл

@ -341,7 +341,7 @@ func (q *query) VStream(request *binlogdatapb.VStreamRequest, stream queryservic
request.EffectiveCallerId,
request.ImmediateCallerId,
)
err = q.server.VStream(ctx, request.Target, request.Position, request.Filter, func(events []*binlogdatapb.VEvent) error {
err = q.server.VStream(ctx, request.Target, request.Position, request.TableLastPKs, request.Filter, func(events []*binlogdatapb.VEvent) error {
return stream.Send(&binlogdatapb.VStreamResponse{
Events: events,
})

Просмотреть файл

@ -598,7 +598,7 @@ func (conn *gRPCQueryClient) StreamHealth(ctx context.Context, callback func(*qu
}
// VStream starts a VReplication stream.
func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target, position string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
stream, err := func() (queryservicepb.Query_VStreamClient, error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
@ -612,6 +612,7 @@ func (conn *gRPCQueryClient) VStream(ctx context.Context, target *querypb.Target
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
Position: position,
Filter: filter,
TableLastPKs: tablePKs,
}
stream, err := conn.c.VStream(ctx, req)
if err != nil {

Просмотреть файл

@ -97,7 +97,7 @@ type QueryService interface {
MessageAck(ctx context.Context, target *querypb.Target, name string, ids []*querypb.Value) (count int64, err error)
// VStream streams VReplication events based on the specified filter.
VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
// VStreamRows streams rows of a table from the specified starting point.
VStreamRows(ctx context.Context, target *querypb.Target, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error

Просмотреть файл

@ -236,9 +236,9 @@ func (ws *wrappedService) MessageAck(ctx context.Context, target *querypb.Target
return count, err
}
func (ws *wrappedService) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (ws *wrappedService) VStream(ctx context.Context, target *querypb.Target, startPos string, tableLastPKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return ws.wrapper(ctx, target, ws.impl, "VStream", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.VStream(ctx, target, startPos, filter, send)
innerErr := conn.VStream(ctx, target, startPos, tableLastPKs, filter, send)
return false, innerErr
})
}

Просмотреть файл

@ -344,7 +344,7 @@ func (sbc *SandboxConn) AddVStreamEvents(events []*binlogdatapb.VEvent, err erro
}
// VStream is part of the QueryService interface.
func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (sbc *SandboxConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
if sbc.StartPos != "" && sbc.StartPos != startPos {
return fmt.Errorf("startPos(%v): %v, want %v", target, startPos, sbc.StartPos)
}

Просмотреть файл

@ -666,14 +666,16 @@ var TestStreamHealthStreamHealthResponse = &querypb.StreamHealthResponse{
Shard: "test_shard",
TabletType: topodatapb.TabletType_RDONLY,
},
Serving: true,
Serving: true,
TabletExternallyReparentedTimestamp: 1234589,
RealtimeStats: &querypb.RealtimeStats{
CpuUsage: 1.0,
HealthError: "random error",
SecondsBehindMaster: 234,
BinlogPlayersCount: 1,
SecondsBehindMasterFilteredReplication: 2,
CpuUsage: 1.0,
},
}
@ -697,7 +699,7 @@ func (f *FakeQueryService) StreamHealth(ctx context.Context, callback func(*quer
}
// VStream is part of the queryservice.QueryService interface
func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (f *FakeQueryService) VStream(ctx context.Context, target *querypb.Target, position string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
panic("not implemented")
}

Просмотреть файл

@ -46,7 +46,7 @@ type VStreamerClient interface {
Close(context.Context) error
// VStream streams VReplication events based on the specified filter.
VStream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
// VStreamRows streams rows of a table from the specified starting point.
VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error
@ -124,7 +124,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error {
return nil
}
func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send)
}
@ -169,8 +169,8 @@ func (tc *tabletConnector) Close(ctx context.Context) error {
return tc.qs.Close(ctx)
}
func (tc *tabletConnector) VStream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return tc.qs.VStream(ctx, tc.target, startPos, filter, send)
func (tc *tabletConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return tc.qs.VStream(ctx, tc.target, startPos, tablePKs, filter, send)
}
func (tc *tabletConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error {

Просмотреть файл

@ -224,7 +224,7 @@ func (ftc *fakeTabletConn) StreamHealth(ctx context.Context, callback func(*quer
var vstreamHook func(ctx context.Context)
// VStream directly calls into the pre-initialized engine.
func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
if target.Keyspace != "vttest" {
<-ctx.Done()
return io.EOF
@ -232,7 +232,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, target *querypb.Target,
if vstreamHook != nil {
vstreamHook(ctx)
}
return streamerEngine.Stream(ctx, startPos, nil, filter, send)
return streamerEngine.Stream(ctx, startPos, tablePKs, filter, send)
}
// vstreamRowsHook allows you to do work just before calling VStreamRows.

Просмотреть файл

@ -19,8 +19,6 @@ package messager
import (
"sync"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
@ -46,7 +44,7 @@ type TabletService interface {
// VStreamer defines the functions of VStreamer
// that the messager needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error
}

Просмотреть файл

@ -26,8 +26,6 @@ import (
"testing"
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/test/utils"
@ -893,7 +891,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp
fv.pollerResponse = pr
}
func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
fv.streamInvocations.Add(1)
for {
fv.mu.Lock()

Просмотреть файл

@ -19,8 +19,6 @@ package tabletserver
import (
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
"golang.org/x/net/context"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
@ -33,7 +31,7 @@ import (
// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*vstreamer.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
}
// ReplicationWatcher is a tabletserver service that watches the

Просмотреть файл

@ -1353,11 +1353,11 @@ func (tsv *TabletServer) execDML(ctx context.Context, target *querypb.Target, qu
}
// VStream streams VReplication events.
func (tsv *TabletServer) VStream(ctx context.Context, target *querypb.Target, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (tsv *TabletServer) VStream(ctx context.Context, target *querypb.Target, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
if err := tsv.verifyTarget(ctx, target); err != nil {
return err
}
return tsv.vstreamer.Stream(ctx, startPos, nil, filter, send)
return tsv.vstreamer.Stream(ctx, startPos, tablePKs, filter, send)
}
// VStreamRows streams rows from the specified starting point.

Просмотреть файл

@ -116,13 +116,34 @@ func (uvs *uvstreamer) sendEventsForRows(ctx context.Context, tableName string,
return nil
}
func getLastPKFromQR(qr *querypb.QueryResult) []sqltypes.Value {
var lastPK []sqltypes.Value
r := sqltypes.Proto3ToResult(qr)
if len(r.Rows) != 1 {
log.Errorf("unexpected lastpk input: %v", qr)
return nil
}
lastPK = r.Rows[0]
return lastPK
}
func getQRFromLastPK(fields []*querypb.Field, lastPK []sqltypes.Value) *querypb.QueryResult {
row := sqltypes.RowToProto3(lastPK)
qr := &querypb.QueryResult{
Fields: fields,
Rows: []*querypb.Row{row},
}
return qr
}
func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var err error
var newLastPK *sqltypes.Result
lastPK := uvs.plans[tableName].tablePK.lastPK.Rows[0]
lastPK := getLastPKFromQR(uvs.plans[tableName].tablePK.Lastpk)
filter := uvs.plans[tableName].rule.Filter
log.Infof("Starting copyTable for %s, PK %v", tableName, lastPK)
uvs.sendTestEvent(fmt.Sprintf("Copy Start %s", tableName))
@ -141,9 +162,9 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
pos, _ := mysql.DecodePosition(rows.Gtid)
if !uvs.pos.IsZero() && !uvs.pos.AtLeast(pos) {
uvs.fastForward(ctx, rows.Gtid)
}
if mysql.EncodePosition(uvs.pos) != rows.Gtid {
log.Errorf("Position after fastforward was %s but stopPos was %s", uvs.pos, rows.Gtid)
if mysql.EncodePosition(uvs.pos) != rows.Gtid {
log.Errorf("Position after fastforward was %s but stopPos was %s", uvs.pos, rows.Gtid)
}
}
fieldEvent := &binlogdatapb.FieldEvent{
@ -155,9 +176,10 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
uvs.sendFieldEvent(ctx, rows.Gtid, fieldEvent)
}
if len(rows.Rows) == 0 {
log.Infof("0 rows returned for table %s", tableName)
//log.Infof("0 rows returned for table %s", tableName)
return nil
}
uvs.sendEventsForRows(ctx, tableName, rows)
newLastPK = sqltypes.CustomProto3ToResult(uvs.pkfields, &querypb.QueryResult{

Просмотреть файл

@ -38,12 +38,6 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)
// TableLastPK records the lastPK seen during the copy phase for a table
type TableLastPK struct {
name string
lastPK *sqltypes.Result
}
// Engine is the engine for handling vreplication streaming requests.
type Engine struct {
env tabletenv.Env
@ -151,7 +145,7 @@ func (vse *Engine) vschema() *vindexes.VSchema {
}
// Stream starts a new stream.
func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
// Ensure vschema is initialized and the watcher is started.
// Starting of the watcher has to be delayed till the first call to Stream
// because this overhead should be incurred only if someone uses this feature.

Просмотреть файл

@ -21,7 +21,7 @@ import (
var uvstreamerTestMode = false // Only used for testing
type tablePlan struct {
tablePK *TableLastPK
tablePK *binlogdatapb.TableLastPK
rule *binlogdatapb.Rule
}
@ -69,7 +69,7 @@ type uvstreamerConfig struct {
CatchupRetryTime time.Duration
}
func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, sh schema.Historian, startPos string, tablePKs []*TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer {
func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, sh schema.Historian, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer {
ctx, cancel := context.WithCancel(ctx)
config := &uvstreamerConfig{
MaxReplicationLag: 1 * time.Nanosecond,
@ -98,8 +98,8 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
uvs.plans[rule.Match] = plan //TODO: only handles actual table name now, no regular expressions
}
for _, tablePK := range tablePKs {
uvs.plans[tablePK.name].tablePK = tablePK
uvs.tablesToCopy = append(uvs.tablesToCopy, tablePK.name)
uvs.plans[tablePK.TableName].tablePK = tablePK
uvs.tablesToCopy = append(uvs.tablesToCopy, tablePK.TableName)
}
sort.Strings(uvs.tablesToCopy)
}
@ -136,16 +136,16 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.
if !shouldSend && tableName != "" {
shouldSend = true
_, ok := uvs.plans[tableName]
if ok && uvs.plans[tableName].tablePK.lastPK.Rows[0] == nil {
if ok && uvs.plans[tableName].tablePK == nil {
shouldSend = false
}
}
if shouldSend {
evs2 = append(evs2, ev)
//log.Infof("shouldSend: sending %v table %s", ev.String(), tableName)
} else {
//log.Infof("shouldSend: filtering out %v", ev.String())
}
//log.Infof("shouldSend: filtering out %v", ev.String())
}
return evs2
}
@ -231,7 +231,7 @@ func (uvs *uvstreamer) init() error {
return err
} //startpos validation for tablepk != nil
if uvs.pos.IsZero() && (len(uvs.plans) == 0) {
return fmt.Errorf("Stream needs atleast a position or a table to copy")
return fmt.Errorf("Stream needs a position or a table to copy")
}
return nil
}
@ -271,8 +271,8 @@ func (uvs *uvstreamer) SetVSchema(vschema *localVSchema) {
}
func (uvs *uvstreamer) setCopyState(tableName string, lastPK *sqltypes.Result) {
uvs.plans[tableName].tablePK.lastPK = lastPK
qr := sqltypes.ResultToProto3(lastPK)
uvs.plans[tableName].tablePK.Lastpk = qr
lastPKEvent := &binlogdatapb.LastPKEvent{
TableLastPK: &binlogdatapb.TableLastPK{
TableName: tableName,

Просмотреть файл

@ -8,12 +8,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/binlogdata"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/query"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
const (
@ -66,8 +66,8 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
initialize(t)
engine.se.Reload(context.Background())
var rules []*binlogdata.Rule
var tablePKs []*TableLastPK
var rules []*binlogdatapb.Rule
var tablePKs []*binlogdatapb.TableLastPK
for i, table := range testState.tables {
rules = append(rules, getRule(table))
tablePKs = append(tablePKs, getTablePK(table, i+1))
@ -200,20 +200,20 @@ func initialize(t *testing.T) {
}
}
func getRule(table string) *binlogdata.Rule {
return &binlogdata.Rule{
func getRule(table string) *binlogdatapb.Rule {
return &binlogdatapb.Rule{
Match: table,
Filter: fmt.Sprintf("select * from %s", table),
}
}
func getTablePK(table string, idx int) *TableLastPK {
return &TableLastPK{
name: table,
lastPK: &sqltypes.Result{
Fields: []*query.Field{{Name: fmt.Sprintf("id%d1", idx), Type: query.Type_INT32}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}},
},
func getTablePK(table string, idx int) *binlogdatapb.TableLastPK {
fields := []*query.Field{{Name: fmt.Sprintf("id%d1", idx), Type: query.Type_INT32}}
lastPK := []sqltypes.Value{sqltypes.NewInt32(0)}
return &binlogdatapb.TableLastPK{
TableName: table,
Lastpk: getQRFromLastPK(fields, lastPK),
}
}
@ -242,7 +242,7 @@ func getEventCallback(ctx context.Context, t *testing.T, event *binlogdatapb.VEv
return nil
}
func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*TableLastPK) {
func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) {
pos := ""
go func() {
err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error {

Просмотреть файл

@ -23,12 +23,10 @@ import (
"strings"
"testing"
"time"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
@ -137,22 +135,9 @@ func TestVStreamCopySimpleFlow(t *testing.T) {
}},
}
var tablePKs []*TableLastPK
tablePKs = append(tablePKs, &TableLastPK{
name: "t1",
lastPK: &sqltypes.Result{
Fields: []*query.Field{{Name: "id11", Type: query.Type_INT32}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}},
},
})
tablePKs = append(tablePKs, &TableLastPK{
name: "t2",
lastPK: &sqltypes.Result{
Fields: []*query.Field{{Name: "id21", Type: query.Type_INT32}},
Rows: [][]sqltypes.Value{{sqltypes.NewInt32(0)}},
},
})
var tablePKs []*binlogdatapb.TableLastPK
tablePKs = append(tablePKs, getTablePK("t1", 1))
tablePKs = append(tablePKs, getTablePK("t2", 2))
t1FieldEvent := []string{"type:FIELD field_event:<table_name:\"t1\" fields:<name:\"id11\" type:INT32 > fields:<name:\"id12\" type:INT32 > > "}
t2FieldEvent := []string{"type:FIELD field_event:<table_name:\"t2\" fields:<name:\"id21\" type:INT32 > fields:<name:\"id22\" type:INT32 > > "}
@ -1455,7 +1440,7 @@ func TestFilteredMultipleWhere(t *testing.T) {
runCases(t, filter, testcases, "", nil)
}
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*TableLastPK) {
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, position string, tablePK []*binlogdatapb.TableLastPK) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -1548,7 +1533,7 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
var lastPos string
func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*TableLastPK) <-chan []*binlogdatapb.VEvent {
func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) <-chan []*binlogdatapb.VEvent {
if position == "" && len(tablePKs) == 0 {
position = masterPosition(t)
}
@ -1564,7 +1549,7 @@ func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter,
return ch
}
func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error {
func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error {
if filter == nil {
filter = &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{

Просмотреть файл

@ -365,6 +365,7 @@ message VStreamRequest {
string position = 4;
Filter filter = 5;
repeated TableLastPK table_last_p_ks = 6;
}
// VStreamResponse is the response from VStreamer