vreplication: tabletserver inits and other tweaks

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2018-12-29 14:37:24 -08:00
Родитель ec4e808b47
Коммит 8cd1c5b33f
7 изменённых файлов: 190 добавлений и 205 удалений

Просмотреть файл

@ -21,6 +21,8 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
// VEventType enumerates the event types.
// This list is comprehensive. Many of these types
// will not be encountered in RBR mode.
type VEventType int32
const (
@ -34,19 +36,21 @@ const (
VEventType_UPDATE VEventType = 7
VEventType_DELETE VEventType = 8
VEventType_SET VEventType = 9
VEventType_ROW VEventType = 10
)
var VEventType_name = map[int32]string{
0: "UNKNOWN",
1: "GTID",
2: "BEGIN",
3: "COMMIT",
4: "ROLLBACK",
5: "DDL",
6: "INSERT",
7: "UPDATE",
8: "DELETE",
9: "SET",
0: "UNKNOWN",
1: "GTID",
2: "BEGIN",
3: "COMMIT",
4: "ROLLBACK",
5: "DDL",
6: "INSERT",
7: "UPDATE",
8: "DELETE",
9: "SET",
10: "ROW",
}
var VEventType_value = map[string]int32{
"UNKNOWN": 0,
@ -59,13 +63,14 @@ var VEventType_value = map[string]int32{
"UPDATE": 7,
"DELETE": 8,
"SET": 9,
"ROW": 10,
}
func (x VEventType) String() string {
return proto.EnumName(VEventType_name, int32(x))
}
func (VEventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{0}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{0}
}
type BinlogTransaction_Statement_Category int32
@ -113,7 +118,7 @@ func (x BinlogTransaction_Statement_Category) String() string {
return proto.EnumName(BinlogTransaction_Statement_Category_name, int32(x))
}
func (BinlogTransaction_Statement_Category) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{1, 0, 0}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{1, 0, 0}
}
// Charset is the per-statement charset info from a QUERY_EVENT binlog entry.
@ -133,7 +138,7 @@ func (m *Charset) Reset() { *m = Charset{} }
func (m *Charset) String() string { return proto.CompactTextString(m) }
func (*Charset) ProtoMessage() {}
func (*Charset) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{0}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{0}
}
func (m *Charset) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Charset.Unmarshal(m, b)
@ -190,7 +195,7 @@ func (m *BinlogTransaction) Reset() { *m = BinlogTransaction{} }
func (m *BinlogTransaction) String() string { return proto.CompactTextString(m) }
func (*BinlogTransaction) ProtoMessage() {}
func (*BinlogTransaction) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{1}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{1}
}
func (m *BinlogTransaction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogTransaction.Unmarshal(m, b)
@ -240,7 +245,7 @@ func (m *BinlogTransaction_Statement) Reset() { *m = BinlogTransaction_S
func (m *BinlogTransaction_Statement) String() string { return proto.CompactTextString(m) }
func (*BinlogTransaction_Statement) ProtoMessage() {}
func (*BinlogTransaction_Statement) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{1, 0}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{1, 0}
}
func (m *BinlogTransaction_Statement) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogTransaction_Statement.Unmarshal(m, b)
@ -298,7 +303,7 @@ func (m *StreamKeyRangeRequest) Reset() { *m = StreamKeyRangeRequest{} }
func (m *StreamKeyRangeRequest) String() string { return proto.CompactTextString(m) }
func (*StreamKeyRangeRequest) ProtoMessage() {}
func (*StreamKeyRangeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{2}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{2}
}
func (m *StreamKeyRangeRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamKeyRangeRequest.Unmarshal(m, b)
@ -351,7 +356,7 @@ func (m *StreamKeyRangeResponse) Reset() { *m = StreamKeyRangeResponse{}
func (m *StreamKeyRangeResponse) String() string { return proto.CompactTextString(m) }
func (*StreamKeyRangeResponse) ProtoMessage() {}
func (*StreamKeyRangeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{3}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{3}
}
func (m *StreamKeyRangeResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamKeyRangeResponse.Unmarshal(m, b)
@ -395,7 +400,7 @@ func (m *StreamTablesRequest) Reset() { *m = StreamTablesRequest{} }
func (m *StreamTablesRequest) String() string { return proto.CompactTextString(m) }
func (*StreamTablesRequest) ProtoMessage() {}
func (*StreamTablesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{4}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{4}
}
func (m *StreamTablesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamTablesRequest.Unmarshal(m, b)
@ -448,7 +453,7 @@ func (m *StreamTablesResponse) Reset() { *m = StreamTablesResponse{} }
func (m *StreamTablesResponse) String() string { return proto.CompactTextString(m) }
func (*StreamTablesResponse) ProtoMessage() {}
func (*StreamTablesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{5}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{5}
}
func (m *StreamTablesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamTablesResponse.Unmarshal(m, b)
@ -493,7 +498,7 @@ func (m *Rule) Reset() { *m = Rule{} }
func (m *Rule) String() string { return proto.CompactTextString(m) }
func (*Rule) ProtoMessage() {}
func (*Rule) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{6}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{6}
}
func (m *Rule) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Rule.Unmarshal(m, b)
@ -540,7 +545,7 @@ func (m *Filter) Reset() { *m = Filter{} }
func (m *Filter) String() string { return proto.CompactTextString(m) }
func (*Filter) ProtoMessage() {}
func (*Filter) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{7}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{7}
}
func (m *Filter) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filter.Unmarshal(m, b)
@ -593,7 +598,7 @@ func (m *BinlogSource) Reset() { *m = BinlogSource{} }
func (m *BinlogSource) String() string { return proto.CompactTextString(m) }
func (*BinlogSource) ProtoMessage() {}
func (*BinlogSource) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{8}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{8}
}
func (m *BinlogSource) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogSource.Unmarshal(m, b)
@ -667,7 +672,7 @@ func (m *RowEvent) Reset() { *m = RowEvent{} }
func (m *RowEvent) String() string { return proto.CompactTextString(m) }
func (*RowEvent) ProtoMessage() {}
func (*RowEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{9}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{9}
}
func (m *RowEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RowEvent.Unmarshal(m, b)
@ -716,7 +721,7 @@ func (m *VEvent) Reset() { *m = VEvent{} }
func (m *VEvent) String() string { return proto.CompactTextString(m) }
func (*VEvent) ProtoMessage() {}
func (*VEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{10}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{10}
}
func (m *VEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VEvent.Unmarshal(m, b)
@ -777,7 +782,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} }
func (m *VStreamRequest) String() string { return proto.CompactTextString(m) }
func (*VStreamRequest) ProtoMessage() {}
func (*VStreamRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{11}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{11}
}
func (m *VStreamRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VStreamRequest.Unmarshal(m, b)
@ -823,7 +828,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} }
func (m *VStreamResponse) String() string { return proto.CompactTextString(m) }
func (*VStreamResponse) ProtoMessage() {}
func (*VStreamResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{12}
return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{12}
}
func (m *VStreamResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VStreamResponse.Unmarshal(m, b)
@ -869,65 +874,65 @@ func init() {
proto.RegisterEnum("binlogdata.BinlogTransaction_Statement_Category", BinlogTransaction_Statement_Category_name, BinlogTransaction_Statement_Category_value)
}
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_9f7a55f16227bf1c) }
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_41dfc977aeca6f81) }
var fileDescriptor_binlogdata_9f7a55f16227bf1c = []byte{
// 900 bytes of a gzipped FileDescriptorProto
var fileDescriptor_binlogdata_41dfc977aeca6f81 = []byte{
// 906 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcd, 0x6e, 0xdb, 0x46,
0x10, 0x0e, 0x45, 0x8a, 0x22, 0x47, 0xae, 0x4d, 0xaf, 0x1d, 0x43, 0x30, 0x50, 0x40, 0xe0, 0xa1,
0x51, 0x0d, 0x54, 0x0a, 0x94, 0xf6, 0xd4, 0x93, 0x29, 0xb1, 0x86, 0x62, 0x5a, 0x36, 0x56, 0x74,
0x5a, 0xe4, 0x42, 0x50, 0xd2, 0x5a, 0x16, 0x2c, 0x73, 0x65, 0xee, 0xca, 0xae, 0x2e, 0x7d, 0x85,
0x1e, 0xfa, 0x14, 0x7d, 0x91, 0xbe, 0x49, 0x6f, 0x7d, 0x88, 0x62, 0x7f, 0x44, 0x49, 0x09, 0xe0,
0xba, 0x87, 0xdc, 0xbe, 0xd9, 0x99, 0x9d, 0x9d, 0xf9, 0x66, 0xf8, 0x11, 0xbc, 0xe1, 0x34, 0x9b,
0xd1, 0xc9, 0x38, 0xe5, 0x69, 0x73, 0x9e, 0x53, 0x4e, 0x11, 0xac, 0x4f, 0x8e, 0xab, 0x0f, 0x0b,
0x92, 0x2f, 0x95, 0xe3, 0x78, 0x97, 0xd3, 0x39, 0x5d, 0x07, 0xfa, 0x17, 0x50, 0xe9, 0xdc, 0xa6,
0x39, 0x23, 0x1c, 0x1d, 0x81, 0x3d, 0x9a, 0x4d, 0x49, 0xc6, 0x6b, 0x46, 0xdd, 0x68, 0x94, 0xb1,
0xb6, 0x10, 0x02, 0x6b, 0x44, 0xb3, 0xac, 0x56, 0x92, 0xa7, 0x12, 0x8b, 0x58, 0x46, 0xf2, 0x47,
0x92, 0xd7, 0x4c, 0x15, 0xab, 0x2c, 0xff, 0x6f, 0x13, 0xf6, 0x03, 0xf9, 0x74, 0x9c, 0xa7, 0x19,
0x4b, 0x47, 0x7c, 0x4a, 0x33, 0x74, 0x06, 0xc0, 0x78, 0xca, 0xc9, 0x3d, 0xc9, 0x38, 0xab, 0x19,
0x75, 0xb3, 0x51, 0x6d, 0xbf, 0x69, 0x6e, 0x14, 0xfd, 0xd9, 0x95, 0xe6, 0x60, 0x15, 0x8f, 0x37,
0xae, 0xa2, 0x36, 0x54, 0xc9, 0x23, 0xc9, 0x78, 0xc2, 0xe9, 0x1d, 0xc9, 0x6a, 0x56, 0xdd, 0x68,
0x54, 0xdb, 0xfb, 0x4d, 0xd5, 0x60, 0x28, 0x3c, 0xb1, 0x70, 0x60, 0x20, 0x05, 0x3e, 0xfe, 0xab,
0x04, 0x6e, 0x91, 0x0d, 0x45, 0xe0, 0x8c, 0x52, 0x4e, 0x26, 0x34, 0x5f, 0xca, 0x36, 0x77, 0xdb,
0x6f, 0x5f, 0x58, 0x48, 0xb3, 0xa3, 0xef, 0xe1, 0x22, 0x03, 0xfa, 0x0e, 0x2a, 0x23, 0xc5, 0x9e,
0x64, 0xa7, 0xda, 0x3e, 0xd8, 0x4c, 0xa6, 0x89, 0xc5, 0xab, 0x18, 0xe4, 0x81, 0xc9, 0x1e, 0x66,
0x92, 0xb2, 0x1d, 0x2c, 0xa0, 0xff, 0xa7, 0x01, 0xce, 0x2a, 0x2f, 0x3a, 0x80, 0xbd, 0x20, 0x4a,
0xae, 0xfb, 0x38, 0xec, 0x5c, 0x9e, 0xf5, 0x7b, 0x1f, 0xc3, 0xae, 0xf7, 0x0a, 0xed, 0x80, 0x13,
0x44, 0x49, 0x10, 0x9e, 0xf5, 0xfa, 0x9e, 0x81, 0xbe, 0x02, 0x37, 0x88, 0x92, 0xce, 0xe5, 0xc5,
0x45, 0x2f, 0xf6, 0x4a, 0x68, 0x0f, 0xaa, 0x41, 0x94, 0xe0, 0xcb, 0x28, 0x0a, 0x4e, 0x3b, 0xe7,
0x9e, 0x89, 0x5e, 0xc3, 0x7e, 0x10, 0x25, 0xdd, 0x8b, 0x28, 0xe9, 0x86, 0x57, 0x38, 0xec, 0x9c,
0xc6, 0x61, 0xd7, 0xb3, 0x10, 0x80, 0x2d, 0x8e, 0xbb, 0x91, 0x57, 0xd6, 0x78, 0x10, 0xc6, 0x9e,
0xad, 0xd3, 0xf5, 0xfa, 0x83, 0x10, 0xc7, 0x5e, 0x45, 0x9b, 0xd7, 0x57, 0xdd, 0xd3, 0x38, 0xf4,
0x1c, 0x6d, 0x76, 0xc3, 0x28, 0x8c, 0x43, 0xcf, 0x7d, 0x6f, 0x39, 0x25, 0xcf, 0x7c, 0x6f, 0x39,
0xa6, 0x67, 0xf9, 0x7f, 0x18, 0xf0, 0x7a, 0xc0, 0x73, 0x92, 0xde, 0x9f, 0x93, 0x25, 0x4e, 0xb3,
0x09, 0xc1, 0xe4, 0x61, 0x41, 0x18, 0x47, 0xc7, 0xe0, 0xcc, 0x29, 0x9b, 0x0a, 0xee, 0x24, 0xc1,
0x2e, 0x2e, 0x6c, 0xd4, 0x02, 0xf7, 0x8e, 0x2c, 0x93, 0x5c, 0xc4, 0x6b, 0xc2, 0x50, 0xb3, 0x58,
0xc8, 0x22, 0x93, 0x73, 0xa7, 0xd1, 0x26, 0xbf, 0xe6, 0x7f, 0xf3, 0xeb, 0xdf, 0xc0, 0xd1, 0xa7,
0x45, 0xb1, 0x39, 0xcd, 0x18, 0x41, 0x11, 0x20, 0x75, 0x31, 0xe1, 0xeb, 0xd9, 0xca, 0xfa, 0xaa,
0xed, 0xaf, 0x9f, 0x5d, 0x00, 0xbc, 0x3f, 0xfc, 0xf4, 0xc8, 0xff, 0x15, 0x0e, 0xd4, 0x3b, 0x71,
0x3a, 0x9c, 0x11, 0xf6, 0x92, 0xd6, 0x8f, 0xc0, 0xe6, 0x32, 0xb8, 0x56, 0xaa, 0x9b, 0x0d, 0x17,
0x6b, 0xeb, 0xff, 0x76, 0x38, 0x86, 0xc3, 0xed, 0x97, 0xbf, 0x48, 0x7f, 0xdf, 0x83, 0x85, 0x17,
0x33, 0x82, 0x0e, 0xa1, 0x7c, 0x9f, 0xf2, 0xd1, 0xad, 0xee, 0x46, 0x19, 0xa2, 0x95, 0x9b, 0xe9,
0x8c, 0x93, 0x5c, 0x8e, 0xd0, 0xc5, 0xda, 0xf2, 0xdf, 0x82, 0xfd, 0x93, 0x44, 0xe8, 0x1b, 0x28,
0xe7, 0x0b, 0xd1, 0xab, 0xfa, 0xd4, 0xbd, 0xcd, 0x02, 0x44, 0x62, 0xac, 0xdc, 0xfe, 0x3f, 0x06,
0xec, 0xa8, 0x82, 0x06, 0x74, 0x91, 0x8f, 0x88, 0x60, 0xf0, 0x8e, 0x2c, 0xd9, 0x3c, 0x1d, 0x91,
0x15, 0x83, 0x2b, 0x5b, 0x14, 0xc3, 0x6e, 0xd3, 0x7c, 0xac, 0x5f, 0x55, 0x06, 0xfa, 0x01, 0xaa,
0x92, 0x49, 0x9e, 0xf0, 0xe5, 0x9c, 0x48, 0x0e, 0x77, 0xdb, 0x87, 0xeb, 0xa5, 0x92, 0x3c, 0xf1,
0x78, 0x39, 0x27, 0x18, 0x78, 0x81, 0xb7, 0x37, 0xd1, 0x7a, 0xc1, 0x26, 0xae, 0xe7, 0x57, 0xde,
0x9a, 0xdf, 0x49, 0x41, 0x86, 0xad, 0xb3, 0x6c, 0xf4, 0xaa, 0xe8, 0x28, 0x08, 0xba, 0x02, 0x07,
0xd3, 0x27, 0x29, 0x53, 0xc8, 0x07, 0x7b, 0x48, 0x6e, 0x68, 0x4e, 0xf4, 0x90, 0x40, 0x8b, 0x18,
0xa6, 0x4f, 0x58, 0x7b, 0x50, 0x1d, 0xca, 0xe9, 0xcd, 0x8a, 0xe7, 0xed, 0x10, 0xe5, 0xf0, 0x7f,
0x37, 0xc0, 0xfe, 0xa0, 0x12, 0x9e, 0x80, 0x25, 0x19, 0x50, 0xa2, 0x76, 0xb4, 0x59, 0x86, 0x8a,
0x90, 0x1c, 0xc8, 0x18, 0xa1, 0xe8, 0x13, 0x3e, 0x5d, 0x31, 0x29, 0xb1, 0xd0, 0xa6, 0xf1, 0x58,
0x69, 0x93, 0x8b, 0x05, 0x44, 0xef, 0x00, 0x72, 0xfa, 0x94, 0x48, 0x29, 0x65, 0x35, 0x4b, 0x8e,
0xf2, 0x70, 0x6b, 0x94, 0xba, 0x19, 0xec, 0xe6, 0x1a, 0x31, 0xff, 0x17, 0xd8, 0xfd, 0xa0, 0x36,
0xf4, 0x25, 0x5f, 0xc5, 0xc9, 0xd6, 0x2a, 0x3d, 0xcf, 0xde, 0x8f, 0xb0, 0x57, 0x64, 0xd6, 0x5b,
0xdf, 0x80, 0xb2, 0xac, 0x4e, 0xef, 0x19, 0xfa, 0xbc, 0x69, 0xac, 0x02, 0x4e, 0x7e, 0x03, 0x58,
0xb3, 0x80, 0xaa, 0x50, 0xb9, 0xee, 0x9f, 0xf7, 0x2f, 0x7f, 0xee, 0x7b, 0xaf, 0x90, 0x03, 0xd6,
0x59, 0xdc, 0xeb, 0x7a, 0x06, 0x72, 0xa1, 0xac, 0x74, 0xb6, 0x24, 0x44, 0x52, 0x8b, 0xac, 0x29,
0x14, 0xb8, 0x50, 0x58, 0x0b, 0x55, 0xc0, 0x2c, 0x74, 0x54, 0x0b, 0xa7, 0x2d, 0xb0, 0x56, 0xcd,
0x8a, 0xc0, 0x5a, 0x32, 0x1d, 0x11, 0x2c, 0x84, 0xd6, 0x0d, 0xbe, 0xfd, 0xf8, 0xe6, 0x71, 0xca,
0x09, 0x63, 0xcd, 0x29, 0x6d, 0x29, 0xd4, 0x9a, 0xd0, 0xd6, 0x23, 0x6f, 0xc9, 0xdf, 0x70, 0x6b,
0x5d, 0xf8, 0xd0, 0x96, 0x27, 0xef, 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x94, 0x9d, 0xa9, 0x2b,
0xd5, 0x07, 0x00, 0x00,
0x10, 0x0e, 0x45, 0x8a, 0x22, 0x47, 0xa9, 0xbd, 0x5e, 0x3b, 0x86, 0x60, 0xa0, 0x80, 0xc0, 0x43,
0xa3, 0x1a, 0xa8, 0x14, 0x28, 0xed, 0xa9, 0x27, 0x53, 0x62, 0x0d, 0xc5, 0xb4, 0x64, 0xac, 0xe8,
0xa4, 0xc8, 0x85, 0xa0, 0xa4, 0xb5, 0x2c, 0x58, 0xe6, 0xca, 0xe4, 0xca, 0xae, 0x9e, 0xa0, 0xc7,
0x1e, 0xfa, 0x14, 0x7d, 0x91, 0xbe, 0x49, 0x6f, 0x7d, 0x88, 0x62, 0x7f, 0x44, 0x49, 0x09, 0x90,
0xba, 0x87, 0xde, 0xbe, 0xd9, 0x99, 0x9d, 0x9d, 0xf9, 0x66, 0xf8, 0x11, 0xd0, 0x68, 0x96, 0xce,
0xd9, 0x74, 0x92, 0xf0, 0xa4, 0xb9, 0xc8, 0x18, 0x67, 0x18, 0x36, 0x27, 0x27, 0xd5, 0x87, 0x25,
0xcd, 0x56, 0xca, 0x71, 0xb2, 0xc7, 0xd9, 0x82, 0x6d, 0x02, 0xbd, 0x4b, 0xa8, 0x74, 0x6e, 0x93,
0x2c, 0xa7, 0x1c, 0x1f, 0x83, 0x3d, 0x9e, 0xcf, 0x68, 0xca, 0x6b, 0x46, 0xdd, 0x68, 0x94, 0x89,
0xb6, 0x30, 0x06, 0x6b, 0xcc, 0xd2, 0xb4, 0x56, 0x92, 0xa7, 0x12, 0x8b, 0xd8, 0x9c, 0x66, 0x8f,
0x34, 0xab, 0x99, 0x2a, 0x56, 0x59, 0xde, 0x5f, 0x26, 0x1c, 0xf8, 0xf2, 0xe9, 0x28, 0x4b, 0xd2,
0x3c, 0x19, 0xf3, 0x19, 0x4b, 0xf1, 0x39, 0x40, 0xce, 0x13, 0x4e, 0xef, 0x69, 0xca, 0xf3, 0x9a,
0x51, 0x37, 0x1b, 0xd5, 0xf6, 0xeb, 0xe6, 0x56, 0xd1, 0x9f, 0x5d, 0x69, 0x0e, 0xd7, 0xf1, 0x64,
0xeb, 0x2a, 0x6e, 0x43, 0x95, 0x3e, 0xd2, 0x94, 0xc7, 0x9c, 0xdd, 0xd1, 0xb4, 0x66, 0xd5, 0x8d,
0x46, 0xb5, 0x7d, 0xd0, 0x54, 0x0d, 0x06, 0xc2, 0x13, 0x09, 0x07, 0x01, 0x5a, 0xe0, 0x93, 0x3f,
0x4b, 0xe0, 0x16, 0xd9, 0x70, 0x08, 0xce, 0x38, 0xe1, 0x74, 0xca, 0xb2, 0x95, 0x6c, 0x73, 0xaf,
0xfd, 0xe6, 0x99, 0x85, 0x34, 0x3b, 0xfa, 0x1e, 0x29, 0x32, 0xe0, 0xef, 0xa0, 0x32, 0x56, 0xec,
0x49, 0x76, 0xaa, 0xed, 0xc3, 0xed, 0x64, 0x9a, 0x58, 0xb2, 0x8e, 0xc1, 0x08, 0xcc, 0xfc, 0x61,
0x2e, 0x29, 0x7b, 0x49, 0x04, 0xf4, 0xfe, 0x30, 0xc0, 0x59, 0xe7, 0xc5, 0x87, 0xb0, 0xef, 0x87,
0xf1, 0x75, 0x9f, 0x04, 0x9d, 0xc1, 0x79, 0xbf, 0xf7, 0x31, 0xe8, 0xa2, 0x17, 0xf8, 0x25, 0x38,
0x7e, 0x18, 0xfb, 0xc1, 0x79, 0xaf, 0x8f, 0x0c, 0xfc, 0x15, 0xb8, 0x7e, 0x18, 0x77, 0x06, 0x97,
0x97, 0xbd, 0x08, 0x95, 0xf0, 0x3e, 0x54, 0xfd, 0x30, 0x26, 0x83, 0x30, 0xf4, 0xcf, 0x3a, 0x17,
0xc8, 0xc4, 0xaf, 0xe0, 0xc0, 0x0f, 0xe3, 0xee, 0x65, 0x18, 0x77, 0x83, 0x2b, 0x12, 0x74, 0xce,
0xa2, 0xa0, 0x8b, 0x2c, 0x0c, 0x60, 0x8b, 0xe3, 0x6e, 0x88, 0xca, 0x1a, 0x0f, 0x83, 0x08, 0xd9,
0x3a, 0x5d, 0xaf, 0x3f, 0x0c, 0x48, 0x84, 0x2a, 0xda, 0xbc, 0xbe, 0xea, 0x9e, 0x45, 0x01, 0x72,
0xb4, 0xd9, 0x0d, 0xc2, 0x20, 0x0a, 0x90, 0xfb, 0xce, 0x72, 0x4a, 0xc8, 0x7c, 0x67, 0x39, 0x26,
0xb2, 0xbc, 0xdf, 0x0d, 0x78, 0x35, 0xe4, 0x19, 0x4d, 0xee, 0x2f, 0xe8, 0x8a, 0x24, 0xe9, 0x94,
0x12, 0xfa, 0xb0, 0xa4, 0x39, 0xc7, 0x27, 0xe0, 0x2c, 0x58, 0x3e, 0x13, 0xdc, 0x49, 0x82, 0x5d,
0x52, 0xd8, 0xb8, 0x05, 0xee, 0x1d, 0x5d, 0xc5, 0x99, 0x88, 0xd7, 0x84, 0xe1, 0x66, 0xb1, 0x90,
0x45, 0x26, 0xe7, 0x4e, 0xa3, 0x6d, 0x7e, 0xcd, 0x7f, 0xe7, 0xd7, 0xbb, 0x81, 0xe3, 0x4f, 0x8b,
0xca, 0x17, 0x2c, 0xcd, 0x29, 0x0e, 0x01, 0xab, 0x8b, 0x31, 0xdf, 0xcc, 0x56, 0xd6, 0x57, 0x6d,
0x7f, 0xfd, 0xc5, 0x05, 0x20, 0x07, 0xa3, 0x4f, 0x8f, 0xbc, 0x5f, 0xe0, 0x50, 0xbd, 0x13, 0x25,
0xa3, 0x39, 0xcd, 0x9f, 0xd3, 0xfa, 0x31, 0xd8, 0x5c, 0x06, 0xd7, 0x4a, 0x75, 0xb3, 0xe1, 0x12,
0x6d, 0xfd, 0xd7, 0x0e, 0x27, 0x70, 0xb4, 0xfb, 0xf2, 0xff, 0xd2, 0xdf, 0xf7, 0x60, 0x91, 0xe5,
0x9c, 0xe2, 0x23, 0x28, 0xdf, 0x27, 0x7c, 0x7c, 0xab, 0xbb, 0x51, 0x86, 0x68, 0xe5, 0x66, 0x36,
0xe7, 0x34, 0x93, 0x23, 0x74, 0x89, 0xb6, 0xbc, 0x37, 0x60, 0xff, 0x24, 0x11, 0xfe, 0x06, 0xca,
0xd9, 0x52, 0xf4, 0xaa, 0x3e, 0x75, 0xb4, 0x5d, 0x80, 0x48, 0x4c, 0x94, 0xdb, 0xfb, 0xdb, 0x80,
0x97, 0xaa, 0xa0, 0x21, 0x5b, 0x66, 0x63, 0x2a, 0x18, 0xbc, 0xa3, 0xab, 0x7c, 0x91, 0x8c, 0xe9,
0x9a, 0xc1, 0xb5, 0x2d, 0x8a, 0xc9, 0x6f, 0x93, 0x6c, 0xa2, 0x5f, 0x55, 0x06, 0xfe, 0x01, 0xaa,
0x92, 0x49, 0x1e, 0xf3, 0xd5, 0x82, 0x4a, 0x0e, 0xf7, 0xda, 0x47, 0x9b, 0xa5, 0x92, 0x3c, 0xf1,
0x68, 0xb5, 0xa0, 0x04, 0x78, 0x81, 0x77, 0x37, 0xd1, 0x7a, 0xc6, 0x26, 0x6e, 0xe6, 0x57, 0xde,
0x99, 0xdf, 0x69, 0x41, 0x86, 0xad, 0xb3, 0x6c, 0xf5, 0xaa, 0xe8, 0x28, 0x08, 0xba, 0x02, 0x87,
0xb0, 0x27, 0x29, 0x53, 0xd8, 0x03, 0x7b, 0x44, 0x6f, 0x58, 0x46, 0xf5, 0x90, 0x40, 0x8b, 0x18,
0x61, 0x4f, 0x44, 0x7b, 0x70, 0x1d, 0xca, 0xc9, 0xcd, 0x9a, 0xe7, 0xdd, 0x10, 0xe5, 0xf0, 0x7e,
0x33, 0xc0, 0x7e, 0xaf, 0x12, 0x9e, 0x82, 0x25, 0x19, 0x50, 0xa2, 0x76, 0xbc, 0x5d, 0x86, 0x8a,
0x90, 0x1c, 0xc8, 0x18, 0xa1, 0xe8, 0x53, 0x3e, 0x5b, 0x33, 0x29, 0xb1, 0xd0, 0xa6, 0xc9, 0x44,
0x69, 0x93, 0x4b, 0x04, 0xc4, 0x6f, 0x01, 0x32, 0xf6, 0x14, 0x4b, 0x29, 0xcd, 0x6b, 0x96, 0x1c,
0xe5, 0xd1, 0xce, 0x28, 0x75, 0x33, 0xc4, 0xcd, 0x34, 0xca, 0xbd, 0x9f, 0x61, 0xef, 0xbd, 0xda,
0xd0, 0xe7, 0x7c, 0x15, 0xa7, 0x3b, 0xab, 0xf4, 0x65, 0xf6, 0x7e, 0x84, 0xfd, 0x22, 0xb3, 0xde,
0xfa, 0x06, 0x94, 0x65, 0x75, 0x7a, 0xcf, 0xf0, 0xe7, 0x4d, 0x13, 0x15, 0x70, 0xfa, 0xab, 0x01,
0xb0, 0xa1, 0x01, 0x57, 0xa1, 0x72, 0xdd, 0xbf, 0xe8, 0x0f, 0x3e, 0xf4, 0xd1, 0x0b, 0xec, 0x80,
0x75, 0x1e, 0xf5, 0xba, 0xc8, 0xc0, 0x2e, 0x94, 0x95, 0xd0, 0x96, 0x84, 0x4a, 0x6a, 0x95, 0x35,
0x85, 0x04, 0x17, 0x12, 0x6b, 0xe1, 0x0a, 0x98, 0x85, 0x90, 0x6a, 0xe5, 0xb4, 0x05, 0xd6, 0xb2,
0x59, 0x11, 0x58, 0x6b, 0xa6, 0x23, 0x82, 0x85, 0xd2, 0xba, 0x02, 0x90, 0xc1, 0x07, 0x04, 0xfe,
0xb7, 0x1f, 0x5f, 0x3f, 0xce, 0x38, 0xcd, 0xf3, 0xe6, 0x8c, 0xb5, 0x14, 0x6a, 0x4d, 0x59, 0xeb,
0x91, 0xb7, 0xe4, 0x0f, 0xb9, 0xb5, 0x69, 0x61, 0x64, 0xcb, 0x93, 0xb7, 0xff, 0x04, 0x00, 0x00,
0xff, 0xff, 0x70, 0xe7, 0x5c, 0xa7, 0xdf, 0x07, 0x00, 0x00,
}

Просмотреть файл

@ -46,6 +46,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/tableacl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
@ -60,6 +61,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
@ -170,6 +172,7 @@ type TabletServer struct {
hr *heartbeat.Reader
messager *messager.Engine
watcher *ReplicationWatcher
vstreamer *vstreamer.Engine
updateStreamList *binlog.StreamList
// checkMySQLThrottler is used to throttle the number of
@ -238,6 +241,7 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali
tsv.txThrottler = txthrottler.CreateTxThrottlerFromTabletConfig(topoServer)
tsv.messager = messager.NewEngine(tsv, tsv.se, config)
tsv.watcher = NewReplicationWatcher(tsv.se, config)
tsv.vstreamer = vstreamer.NewEngine(srvtopo.NewResilientServer(topoServer, "TabletSrvTopo"), tsv.se)
tsv.updateStreamList = &binlog.StreamList{}
// FIXME(alainjobart) could we move this to the Register method below?
// So that vtcombo doesn't even call it once, on the first tablet.
@ -348,6 +352,7 @@ func (tsv *TabletServer) InitDBConfig(target querypb.Target, dbcfgs *dbconfigs.D
tsv.hr.InitDBConfig(tsv.dbconfigs)
tsv.messager.InitDBConfig(tsv.dbconfigs)
tsv.watcher.InitDBConfig(tsv.dbconfigs)
tsv.vstreamer.InitDBConfig(tsv.dbconfigs)
return nil
}
@ -513,6 +518,7 @@ func (tsv *TabletServer) fullStart() (err error) {
}
tsv.hr.Init(tsv.target)
tsv.updateStreamList.Init()
tsv.vstreamer.Open(tsv.target.Keyspace, tsv.alias.Cell)
return tsv.serveNewType()
}
@ -577,6 +583,7 @@ func (tsv *TabletServer) StopService() {
tsv.se.Close()
tsv.hw.Close()
tsv.hr.Close()
tsv.vstreamer.Close()
log.Infof("Shutdown complete.")
tsv.transition(StateNotConnected)
}

Просмотреть файл

@ -23,6 +23,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/topo"
@ -37,6 +38,9 @@ var vschemaErrors *stats.Counter
// Engine is the engine for handling vseplication streaming requests.
type Engine struct {
// cp is initialized by InitDBConfig
cp *mysql.ConnParams
// mu protects isOpen, streamers, streamIdx and kschema.
mu sync.Mutex
@ -69,6 +73,11 @@ func NewEngine(ts srvtopo.Server, se *schema.Engine) *Engine {
}
}
// InitDBConfig performs saves the required info from dbconfigs for future use.
func (vse *Engine) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) {
vse.cp = dbcfgs.DbaWithDB()
}
// Open starts the Engine service.
func (vse *Engine) Open(keyspace, cell string) error {
vse.mu.Lock()
@ -103,7 +112,7 @@ func (vse *Engine) Close() {
}
// Stream starts a new stream.
func (vse *Engine) Stream(ctx context.Context, cp *mysql.ConnParams, startPos mysql.Position, filter *binlogdatapb.Filter, send func(*binlogdatapb.VEvent) error) error {
func (vse *Engine) Stream(ctx context.Context, startPos mysql.Position, filter *binlogdatapb.Filter, send func(*binlogdatapb.VEvent) error) error {
// Ensure kschema is initialized and the watcher is started.
vse.watcherOnce.Do(vse.setWatch)
@ -114,7 +123,7 @@ func (vse *Engine) Stream(ctx context.Context, cp *mysql.ConnParams, startPos my
if !vse.isOpen {
return nil, 0, errors.New("VStreamer is not open")
}
streamer := newVStreamer(ctx, cp, vse.se, startPos, filter, vse.kschema, send)
streamer := newVStreamer(ctx, vse.cp, vse.se, startPos, filter, vse.kschema, send)
idx := vse.streamIdx
vse.streamers[idx] = streamer
vse.streamIdx++

Просмотреть файл

@ -93,6 +93,8 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
if plan.Vindex == nil {
return true, result, nil
}
// Filter by Vindex.
destinations, err := plan.Vindex.Map(nil, []sqltypes.Value{result[plan.VindexColumn]})
if err != nil {
return false, nil, err
@ -154,6 +156,8 @@ func buildREPlan(ti *Table, kschema *vindexes.KeyspaceSchema, filter string) (*P
if len(table.ColumnVindexes) == 0 {
return nil, fmt.Errorf("table %s has no primary vindex", ti.Name)
}
// findColumn can be used here because result column list is same
// as source.
colnum, err := findColumn(ti, table.ColumnVindexes[0].Columns[0])
if err != nil {
return nil, err
@ -208,11 +212,22 @@ func buildTablePlan(ti *Table, kschema *vindexes.KeyspaceSchema, query string) (
}
plan.ColExprs = append(plan.ColExprs, cExpr)
}
} else {
if len(sel.SelectExprs) != 1 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel))
}
plan.ColExprs = make([]ColExpr, len(ti.Columns))
for i, col := range ti.Columns {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = col.Name
}
}
if sel.Where == nil {
return plan, nil
}
// Filter by Vindex.
funcExpr, ok := sel.Where.Expr.(*sqlparser.FuncExpr)
if !ok {
return nil, fmt.Errorf("unexpected where clause: %v", sqlparser.String(sel.Where))

Просмотреть файл

@ -136,11 +136,17 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
if !ok {
return fmt.Errorf("server EOF")
}
if err := vs.parseEvent(ev); err != nil {
vevents, err := vs.parseEvent(ev)
if err != nil {
return err
}
for _, vevent := range vevents {
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending event: %v", err)
}
}
case vs.kschema = <-vs.kevents:
if err := vs.updatePlans(); err != nil {
if err := vs.rebuildPlans(); err != nil {
return err
}
case <-ctx.Done():
@ -149,10 +155,10 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
}
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error {
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {
// Validate the buffer before reading fields from it.
if !ev.IsValid() {
return fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
// We need to keep checking for FORMAT_DESCRIPTION_EVENT even after we've
@ -162,9 +168,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error {
var err error
vs.format, err = ev.Format()
if err != nil {
return fmt.Errorf("can't parse FORMAT_DESCRIPTION_EVENT: %v, event data: %#v", err, ev)
return nil, fmt.Errorf("can't parse FORMAT_DESCRIPTION_EVENT: %v, event data: %#v", err, ev)
}
return nil
return nil, nil
}
// We can't parse anything until we get a FORMAT_DESCRIPTION_EVENT that
@ -174,82 +180,70 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error {
// is a fake ROTATE_EVENT, which the master sends to tell us the name
// of the current log file.
if ev.IsRotate() {
return nil
return nil, nil
}
return fmt.Errorf("got a real event before FORMAT_DESCRIPTION_EVENT: %#v", ev)
return nil, fmt.Errorf("got a real event before FORMAT_DESCRIPTION_EVENT: %#v", ev)
}
// Strip the checksum, if any. We don't actually verify the checksum, so discard it.
ev, _, err := ev.StripChecksum(vs.format)
if err != nil {
return fmt.Errorf("can't strip checksum from binlog event: %v, event data: %#v", err, ev)
return nil, fmt.Errorf("can't strip checksum from binlog event: %v, event data: %#v", err, ev)
}
var vevents []*binlogdatapb.VEvent
switch {
case ev.IsPseudo() || ev.IsGTID():
gtid, hasBegin, err := ev.GTID(vs.format)
if err != nil {
return fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev)
}
vs.pos = mysql.AppendGTID(vs.pos, gtid)
vevent := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_GTID,
Gtid: mysql.EncodePosition(vs.pos),
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending GTID: %v", err)
return nil, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev)
}
if hasBegin {
vevent := &binlogdatapb.VEvent{
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_BEGIN,
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending BEGIN: %v", err)
}
})
}
vs.pos = mysql.AppendGTID(vs.pos, gtid)
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_GTID,
Gtid: mysql.EncodePosition(vs.pos),
})
case ev.IsXID():
vevent := &binlogdatapb.VEvent{
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_COMMIT,
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending COMMIT: %v", err)
}
})
case ev.IsQuery():
q, err := ev.Query(vs.format)
if err != nil {
return fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev)
return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev)
}
var vevent *binlogdatapb.VEvent
switch cat := getStatementCategory(q.SQL); cat {
case binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_ROLLBACK:
vevent = &binlogdatapb.VEvent{
vevents = append(vevents, &binlogdatapb.VEvent{
Type: cat,
}
})
case binlogdatapb.VEventType_DDL:
vevent = &binlogdatapb.VEvent{
vevents = append(vevents, &binlogdatapb.VEvent{
Type: cat,
Ddl: q.SQL,
}
})
default:
return fmt.Errorf("unexpected event type %v in row-based replication: %#v", cat, ev)
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending COMMIT: %v", err)
return nil, fmt.Errorf("unexpected event type %v in row-based replication: %#v", cat, ev)
}
case ev.IsTableMap():
id := ev.TableID(vs.format)
tm, err := ev.TableMap(vs.format)
if err != nil {
return err
return nil, err
}
if tm.Database != "" && tm.Database != vs.cp.DbName {
return nil
return nil, nil
}
ti := vs.se.GetTable(sqlparser.NewTableIdent(tm.Name))
if ti == nil {
return fmt.Errorf("unknown table %v in schema", tm.Name)
return nil, fmt.Errorf("unknown table %v in schema", tm.Name)
}
if len(ti.Columns) < len(tm.Types) {
return fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(ti.Columns), ev)
return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(ti.Columns), ev)
}
table := &Table{
TableMap: tm,
@ -257,88 +251,35 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error {
}
plan, err := buildPlan(table, vs.kschema, vs.filter)
if err != nil {
return err
return nil, err
}
if plan != nil {
vs.plans[id] = plan
}
case ev.IsWriteRows():
case ev.IsWriteRows() || ev.IsDeleteRows() || ev.IsUpdateRows():
// The existence of before and after images can be used to
// identify statememt types. It's also possible that the
// before and after images end up going to different shards.
// If so, an update will be treated as delete on one shard
// and insert on the other.
id := ev.TableID(vs.format)
plan, ok := vs.plans[id]
if !ok {
return nil
return nil, nil
}
rows, err := ev.Rows(vs.format, plan.Table.TableMap)
if err != nil {
return err
}
rowEvents := make([]*binlogdatapb.RowEvent, 0, len(rows.Rows))
for _, row := range rows.Rows {
ok, values, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns)
if err != nil {
return err
}
if !ok {
continue
}
encoded := sqltypes.RowToProto3(values)
rowEvents = append(rowEvents, &binlogdatapb.RowEvent{After: encoded})
}
vevent := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_INSERT,
RowEvents: rowEvents,
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending INSERT: %v", err)
}
case ev.IsDeleteRows():
id := ev.TableID(vs.format)
plan, ok := vs.plans[id]
if !ok {
return nil
}
rows, err := ev.Rows(vs.format, plan.Table.TableMap)
if err != nil {
return err
}
rowEvents := make([]*binlogdatapb.RowEvent, 0, len(rows.Rows))
for _, row := range rows.Rows {
ok, values, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns)
if err != nil {
return err
}
if !ok {
continue
}
encoded := sqltypes.RowToProto3(values)
rowEvents = append(rowEvents, &binlogdatapb.RowEvent{Before: encoded})
}
vevent := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_DELETE,
RowEvents: rowEvents,
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending DELETE: %v", err)
}
case ev.IsUpdateRows():
id := ev.TableID(vs.format)
plan, ok := vs.plans[id]
if !ok {
return nil
}
rows, err := ev.Rows(vs.format, plan.Table.TableMap)
if err != nil {
return err
return nil, err
}
rowEvents := make([]*binlogdatapb.RowEvent, 0, len(rows.Rows))
for _, row := range rows.Rows {
beforeOK, beforeValues, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns)
if err != nil {
return err
return nil, err
}
afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns)
if err != nil {
return err
return nil, err
}
if !beforeOK && !afterOK {
continue
@ -352,18 +293,15 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error {
}
rowEvents = append(rowEvents, rowEvent)
}
vevent := &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_UPDATE,
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_ROW,
RowEvents: rowEvents,
}
if err := vs.send(vevent); err != nil {
return fmt.Errorf("error sending UPDATE: %v", err)
}
})
}
return nil
return vevents, nil
}
func (vs *vstreamer) updatePlans() error {
func (vs *vstreamer) rebuildPlans() error {
for id, plan := range vs.plans {
newPlan, err := buildPlan(plan.Table, vs.kschema, vs.filter)
if err != nil {
@ -375,12 +313,15 @@ func (vs *vstreamer) updatePlans() error {
}
func (vs *vstreamer) extractRowAndFilter(plan *Plan, data []byte, dataColumns, nullColumns mysql.Bitmap) (bool, []sqltypes.Value, error) {
if len(data) == 0 {
return false, nil, nil
}
values := make([]sqltypes.Value, dataColumns.Count())
valueIndex := 0
pos := 0
for colNum := 0; colNum < dataColumns.Count(); colNum++ {
if !dataColumns.Bit(colNum) {
continue
return false, nil, fmt.Errorf("partial row image encountered: ensure binlog_row_image is set to 'full'")
}
if nullColumns.Bit(valueIndex) {
valueIndex++

Просмотреть файл

@ -156,6 +156,8 @@ message BinlogSource {
}
// VEventType enumerates the event types.
// This list is comprehensive. Many of these types
// will not be encountered in RBR mode.
enum VEventType {
UNKNOWN = 0;
GTID = 1;
@ -167,6 +169,7 @@ enum VEventType {
UPDATE = 7;
DELETE = 8;
SET = 9;
ROW = 10;
}
message RowEvent {

Просмотреть файл

@ -22,7 +22,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
package='binlogdata',
syntax='proto3',
serialized_options=_b('Z\'vitess.io/vitess/go/vt/proto/binlogdata'),
serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xb5\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\"A\n\x08RowEvent\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"s\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x0c\n\x04gtid\x18\x02 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x03 \x01(\t\x12(\n\nrow_events\x18\x04 \x03(\x0b\x32\x14.binlogdata.RowEvent\"F\n\x0eVStreamRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\"\n\x06\x66ilter\x18\x02 \x01(\x0b\x32\x12.binlogdata.Filter\"4\n\x0fVStreamResponse\x12!\n\x05\x65vent\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*~\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\n\n\x06UPDATE\x10\x07\x12\n\n\x06\x44\x45LETE\x10\x08\x12\x07\n\x03SET\x10\tB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3')
serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xb5\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\"A\n\x08RowEvent\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"s\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x0c\n\x04gtid\x18\x02 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x03 \x01(\t\x12(\n\nrow_events\x18\x04 \x03(\x0b\x32\x14.binlogdata.RowEvent\"F\n\x0eVStreamRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\"\n\x06\x66ilter\x18\x02 \x01(\x0b\x32\x12.binlogdata.Filter\"4\n\x0fVStreamResponse\x12!\n\x05\x65vent\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*\x87\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\n\n\x06UPDATE\x10\x07\x12\n\n\x06\x44\x45LETE\x10\x08\x12\x07\n\x03SET\x10\t\x12\x07\n\x03ROW\x10\nB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3')
,
dependencies=[query__pb2.DESCRIPTOR,topodata__pb2.DESCRIPTOR,])
@ -72,11 +72,15 @@ _VEVENTTYPE = _descriptor.EnumDescriptor(
name='SET', index=9, number=9,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='ROW', index=10, number=10,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=1517,
serialized_end=1643,
serialized_start=1518,
serialized_end=1653,
)
_sym_db.RegisterEnumDescriptor(_VEVENTTYPE)
@ -91,6 +95,7 @@ INSERT = 6
UPDATE = 7
DELETE = 8
SET = 9
ROW = 10
_BINLOGTRANSACTION_STATEMENT_CATEGORY = _descriptor.EnumDescriptor(