From 8cd1c5b33f0cf1af9702f2c2eaaa448ded1e82ea Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Sat, 29 Dec 2018 14:37:24 -0800 Subject: [PATCH] vreplication: tabletserver inits and other tweaks Signed-off-by: Sugu Sougoumarane --- go/vt/proto/binlogdata/binlogdata.pb.go | 175 +++++++++--------- go/vt/vttablet/tabletserver/tabletserver.go | 7 + .../vttablet/tabletserver/vstreamer/engine.go | 13 +- .../tabletserver/vstreamer/planbuilder.go | 15 ++ .../tabletserver/vstreamer/streamer.go | 171 ++++++----------- proto/binlogdata.proto | 3 + py/vtproto/binlogdata_pb2.py | 11 +- 7 files changed, 190 insertions(+), 205 deletions(-) diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index d2dd885698..ecfbf2ee43 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -21,6 +21,8 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // VEventType enumerates the event types. +// This list is comprehensive. Many of these types +// will not be encountered in RBR mode. type VEventType int32 const ( @@ -34,19 +36,21 @@ const ( VEventType_UPDATE VEventType = 7 VEventType_DELETE VEventType = 8 VEventType_SET VEventType = 9 + VEventType_ROW VEventType = 10 ) var VEventType_name = map[int32]string{ - 0: "UNKNOWN", - 1: "GTID", - 2: "BEGIN", - 3: "COMMIT", - 4: "ROLLBACK", - 5: "DDL", - 6: "INSERT", - 7: "UPDATE", - 8: "DELETE", - 9: "SET", + 0: "UNKNOWN", + 1: "GTID", + 2: "BEGIN", + 3: "COMMIT", + 4: "ROLLBACK", + 5: "DDL", + 6: "INSERT", + 7: "UPDATE", + 8: "DELETE", + 9: "SET", + 10: "ROW", } var VEventType_value = map[string]int32{ "UNKNOWN": 0, @@ -59,13 +63,14 @@ var VEventType_value = map[string]int32{ "UPDATE": 7, "DELETE": 8, "SET": 9, + "ROW": 10, } func (x VEventType) String() string { return proto.EnumName(VEventType_name, int32(x)) } func (VEventType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{0} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{0} } type BinlogTransaction_Statement_Category int32 @@ -113,7 +118,7 @@ func (x BinlogTransaction_Statement_Category) String() string { return proto.EnumName(BinlogTransaction_Statement_Category_name, int32(x)) } func (BinlogTransaction_Statement_Category) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{1, 0, 0} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{1, 0, 0} } // Charset is the per-statement charset info from a QUERY_EVENT binlog entry. @@ -133,7 +138,7 @@ func (m *Charset) Reset() { *m = Charset{} } func (m *Charset) String() string { return proto.CompactTextString(m) } func (*Charset) ProtoMessage() {} func (*Charset) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{0} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{0} } func (m *Charset) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Charset.Unmarshal(m, b) @@ -190,7 +195,7 @@ func (m *BinlogTransaction) Reset() { *m = BinlogTransaction{} } func (m *BinlogTransaction) String() string { return proto.CompactTextString(m) } func (*BinlogTransaction) ProtoMessage() {} func (*BinlogTransaction) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{1} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{1} } func (m *BinlogTransaction) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogTransaction.Unmarshal(m, b) @@ -240,7 +245,7 @@ func (m *BinlogTransaction_Statement) Reset() { *m = BinlogTransaction_S func (m *BinlogTransaction_Statement) String() string { return proto.CompactTextString(m) } func (*BinlogTransaction_Statement) ProtoMessage() {} func (*BinlogTransaction_Statement) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{1, 0} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{1, 0} } func (m *BinlogTransaction_Statement) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogTransaction_Statement.Unmarshal(m, b) @@ -298,7 +303,7 @@ func (m *StreamKeyRangeRequest) Reset() { *m = StreamKeyRangeRequest{} } func (m *StreamKeyRangeRequest) String() string { return proto.CompactTextString(m) } func (*StreamKeyRangeRequest) ProtoMessage() {} func (*StreamKeyRangeRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{2} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{2} } func (m *StreamKeyRangeRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamKeyRangeRequest.Unmarshal(m, b) @@ -351,7 +356,7 @@ func (m *StreamKeyRangeResponse) Reset() { *m = StreamKeyRangeResponse{} func (m *StreamKeyRangeResponse) String() string { return proto.CompactTextString(m) } func (*StreamKeyRangeResponse) ProtoMessage() {} func (*StreamKeyRangeResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{3} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{3} } func (m *StreamKeyRangeResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamKeyRangeResponse.Unmarshal(m, b) @@ -395,7 +400,7 @@ func (m *StreamTablesRequest) Reset() { *m = StreamTablesRequest{} } func (m *StreamTablesRequest) String() string { return proto.CompactTextString(m) } func (*StreamTablesRequest) ProtoMessage() {} func (*StreamTablesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{4} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{4} } func (m *StreamTablesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamTablesRequest.Unmarshal(m, b) @@ -448,7 +453,7 @@ func (m *StreamTablesResponse) Reset() { *m = StreamTablesResponse{} } func (m *StreamTablesResponse) String() string { return proto.CompactTextString(m) } func (*StreamTablesResponse) ProtoMessage() {} func (*StreamTablesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{5} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{5} } func (m *StreamTablesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamTablesResponse.Unmarshal(m, b) @@ -493,7 +498,7 @@ func (m *Rule) Reset() { *m = Rule{} } func (m *Rule) String() string { return proto.CompactTextString(m) } func (*Rule) ProtoMessage() {} func (*Rule) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{6} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{6} } func (m *Rule) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Rule.Unmarshal(m, b) @@ -540,7 +545,7 @@ func (m *Filter) Reset() { *m = Filter{} } func (m *Filter) String() string { return proto.CompactTextString(m) } func (*Filter) ProtoMessage() {} func (*Filter) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{7} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{7} } func (m *Filter) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Filter.Unmarshal(m, b) @@ -593,7 +598,7 @@ func (m *BinlogSource) Reset() { *m = BinlogSource{} } func (m *BinlogSource) String() string { return proto.CompactTextString(m) } func (*BinlogSource) ProtoMessage() {} func (*BinlogSource) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{8} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{8} } func (m *BinlogSource) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BinlogSource.Unmarshal(m, b) @@ -667,7 +672,7 @@ func (m *RowEvent) Reset() { *m = RowEvent{} } func (m *RowEvent) String() string { return proto.CompactTextString(m) } func (*RowEvent) ProtoMessage() {} func (*RowEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{9} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{9} } func (m *RowEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RowEvent.Unmarshal(m, b) @@ -716,7 +721,7 @@ func (m *VEvent) Reset() { *m = VEvent{} } func (m *VEvent) String() string { return proto.CompactTextString(m) } func (*VEvent) ProtoMessage() {} func (*VEvent) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{10} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{10} } func (m *VEvent) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VEvent.Unmarshal(m, b) @@ -777,7 +782,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} } func (m *VStreamRequest) String() string { return proto.CompactTextString(m) } func (*VStreamRequest) ProtoMessage() {} func (*VStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{11} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{11} } func (m *VStreamRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamRequest.Unmarshal(m, b) @@ -823,7 +828,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} } func (m *VStreamResponse) String() string { return proto.CompactTextString(m) } func (*VStreamResponse) ProtoMessage() {} func (*VStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_binlogdata_9f7a55f16227bf1c, []int{12} + return fileDescriptor_binlogdata_41dfc977aeca6f81, []int{12} } func (m *VStreamResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamResponse.Unmarshal(m, b) @@ -869,65 +874,65 @@ func init() { proto.RegisterEnum("binlogdata.BinlogTransaction_Statement_Category", BinlogTransaction_Statement_Category_name, BinlogTransaction_Statement_Category_value) } -func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_9f7a55f16227bf1c) } +func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_41dfc977aeca6f81) } -var fileDescriptor_binlogdata_9f7a55f16227bf1c = []byte{ - // 900 bytes of a gzipped FileDescriptorProto +var fileDescriptor_binlogdata_41dfc977aeca6f81 = []byte{ + // 906 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcd, 0x6e, 0xdb, 0x46, - 0x10, 0x0e, 0x45, 0x8a, 0x22, 0x47, 0xae, 0x4d, 0xaf, 0x1d, 0x43, 0x30, 0x50, 0x40, 0xe0, 0xa1, - 0x51, 0x0d, 0x54, 0x0a, 0x94, 0xf6, 0xd4, 0x93, 0x29, 0xb1, 0x86, 0x62, 0x5a, 0x36, 0x56, 0x74, - 0x5a, 0xe4, 0x42, 0x50, 0xd2, 0x5a, 0x16, 0x2c, 0x73, 0x65, 0xee, 0xca, 0xae, 0x2e, 0x7d, 0x85, - 0x1e, 0xfa, 0x14, 0x7d, 0x91, 0xbe, 0x49, 0x6f, 0x7d, 0x88, 0x62, 0x7f, 0x44, 0x49, 0x09, 0xe0, - 0xba, 0x87, 0xdc, 0xbe, 0xd9, 0x99, 0x9d, 0x9d, 0xf9, 0x66, 0xf8, 0x11, 0xbc, 0xe1, 0x34, 0x9b, - 0xd1, 0xc9, 0x38, 0xe5, 0x69, 0x73, 0x9e, 0x53, 0x4e, 0x11, 0xac, 0x4f, 0x8e, 0xab, 0x0f, 0x0b, - 0x92, 0x2f, 0x95, 0xe3, 0x78, 0x97, 0xd3, 0x39, 0x5d, 0x07, 0xfa, 0x17, 0x50, 0xe9, 0xdc, 0xa6, - 0x39, 0x23, 0x1c, 0x1d, 0x81, 0x3d, 0x9a, 0x4d, 0x49, 0xc6, 0x6b, 0x46, 0xdd, 0x68, 0x94, 0xb1, - 0xb6, 0x10, 0x02, 0x6b, 0x44, 0xb3, 0xac, 0x56, 0x92, 0xa7, 0x12, 0x8b, 0x58, 0x46, 0xf2, 0x47, - 0x92, 0xd7, 0x4c, 0x15, 0xab, 0x2c, 0xff, 0x6f, 0x13, 0xf6, 0x03, 0xf9, 0x74, 0x9c, 0xa7, 0x19, - 0x4b, 0x47, 0x7c, 0x4a, 0x33, 0x74, 0x06, 0xc0, 0x78, 0xca, 0xc9, 0x3d, 0xc9, 0x38, 0xab, 0x19, - 0x75, 0xb3, 0x51, 0x6d, 0xbf, 0x69, 0x6e, 0x14, 0xfd, 0xd9, 0x95, 0xe6, 0x60, 0x15, 0x8f, 0x37, - 0xae, 0xa2, 0x36, 0x54, 0xc9, 0x23, 0xc9, 0x78, 0xc2, 0xe9, 0x1d, 0xc9, 0x6a, 0x56, 0xdd, 0x68, - 0x54, 0xdb, 0xfb, 0x4d, 0xd5, 0x60, 0x28, 0x3c, 0xb1, 0x70, 0x60, 0x20, 0x05, 0x3e, 0xfe, 0xab, - 0x04, 0x6e, 0x91, 0x0d, 0x45, 0xe0, 0x8c, 0x52, 0x4e, 0x26, 0x34, 0x5f, 0xca, 0x36, 0x77, 0xdb, - 0x6f, 0x5f, 0x58, 0x48, 0xb3, 0xa3, 0xef, 0xe1, 0x22, 0x03, 0xfa, 0x0e, 0x2a, 0x23, 0xc5, 0x9e, - 0x64, 0xa7, 0xda, 0x3e, 0xd8, 0x4c, 0xa6, 0x89, 0xc5, 0xab, 0x18, 0xe4, 0x81, 0xc9, 0x1e, 0x66, - 0x92, 0xb2, 0x1d, 0x2c, 0xa0, 0xff, 0xa7, 0x01, 0xce, 0x2a, 0x2f, 0x3a, 0x80, 0xbd, 0x20, 0x4a, - 0xae, 0xfb, 0x38, 0xec, 0x5c, 0x9e, 0xf5, 0x7b, 0x1f, 0xc3, 0xae, 0xf7, 0x0a, 0xed, 0x80, 0x13, - 0x44, 0x49, 0x10, 0x9e, 0xf5, 0xfa, 0x9e, 0x81, 0xbe, 0x02, 0x37, 0x88, 0x92, 0xce, 0xe5, 0xc5, - 0x45, 0x2f, 0xf6, 0x4a, 0x68, 0x0f, 0xaa, 0x41, 0x94, 0xe0, 0xcb, 0x28, 0x0a, 0x4e, 0x3b, 0xe7, - 0x9e, 0x89, 0x5e, 0xc3, 0x7e, 0x10, 0x25, 0xdd, 0x8b, 0x28, 0xe9, 0x86, 0x57, 0x38, 0xec, 0x9c, - 0xc6, 0x61, 0xd7, 0xb3, 0x10, 0x80, 0x2d, 0x8e, 0xbb, 0x91, 0x57, 0xd6, 0x78, 0x10, 0xc6, 0x9e, - 0xad, 0xd3, 0xf5, 0xfa, 0x83, 0x10, 0xc7, 0x5e, 0x45, 0x9b, 0xd7, 0x57, 0xdd, 0xd3, 0x38, 0xf4, - 0x1c, 0x6d, 0x76, 0xc3, 0x28, 0x8c, 0x43, 0xcf, 0x7d, 0x6f, 0x39, 0x25, 0xcf, 0x7c, 0x6f, 0x39, - 0xa6, 0x67, 0xf9, 0x7f, 0x18, 0xf0, 0x7a, 0xc0, 0x73, 0x92, 0xde, 0x9f, 0x93, 0x25, 0x4e, 0xb3, - 0x09, 0xc1, 0xe4, 0x61, 0x41, 0x18, 0x47, 0xc7, 0xe0, 0xcc, 0x29, 0x9b, 0x0a, 0xee, 0x24, 0xc1, - 0x2e, 0x2e, 0x6c, 0xd4, 0x02, 0xf7, 0x8e, 0x2c, 0x93, 0x5c, 0xc4, 0x6b, 0xc2, 0x50, 0xb3, 0x58, - 0xc8, 0x22, 0x93, 0x73, 0xa7, 0xd1, 0x26, 0xbf, 0xe6, 0x7f, 0xf3, 0xeb, 0xdf, 0xc0, 0xd1, 0xa7, - 0x45, 0xb1, 0x39, 0xcd, 0x18, 0x41, 0x11, 0x20, 0x75, 0x31, 0xe1, 0xeb, 0xd9, 0xca, 0xfa, 0xaa, - 0xed, 0xaf, 0x9f, 0x5d, 0x00, 0xbc, 0x3f, 0xfc, 0xf4, 0xc8, 0xff, 0x15, 0x0e, 0xd4, 0x3b, 0x71, - 0x3a, 0x9c, 0x11, 0xf6, 0x92, 0xd6, 0x8f, 0xc0, 0xe6, 0x32, 0xb8, 0x56, 0xaa, 0x9b, 0x0d, 0x17, - 0x6b, 0xeb, 0xff, 0x76, 0x38, 0x86, 0xc3, 0xed, 0x97, 0xbf, 0x48, 0x7f, 0xdf, 0x83, 0x85, 0x17, - 0x33, 0x82, 0x0e, 0xa1, 0x7c, 0x9f, 0xf2, 0xd1, 0xad, 0xee, 0x46, 0x19, 0xa2, 0x95, 0x9b, 0xe9, - 0x8c, 0x93, 0x5c, 0x8e, 0xd0, 0xc5, 0xda, 0xf2, 0xdf, 0x82, 0xfd, 0x93, 0x44, 0xe8, 0x1b, 0x28, - 0xe7, 0x0b, 0xd1, 0xab, 0xfa, 0xd4, 0xbd, 0xcd, 0x02, 0x44, 0x62, 0xac, 0xdc, 0xfe, 0x3f, 0x06, - 0xec, 0xa8, 0x82, 0x06, 0x74, 0x91, 0x8f, 0x88, 0x60, 0xf0, 0x8e, 0x2c, 0xd9, 0x3c, 0x1d, 0x91, - 0x15, 0x83, 0x2b, 0x5b, 0x14, 0xc3, 0x6e, 0xd3, 0x7c, 0xac, 0x5f, 0x55, 0x06, 0xfa, 0x01, 0xaa, - 0x92, 0x49, 0x9e, 0xf0, 0xe5, 0x9c, 0x48, 0x0e, 0x77, 0xdb, 0x87, 0xeb, 0xa5, 0x92, 0x3c, 0xf1, - 0x78, 0x39, 0x27, 0x18, 0x78, 0x81, 0xb7, 0x37, 0xd1, 0x7a, 0xc1, 0x26, 0xae, 0xe7, 0x57, 0xde, - 0x9a, 0xdf, 0x49, 0x41, 0x86, 0xad, 0xb3, 0x6c, 0xf4, 0xaa, 0xe8, 0x28, 0x08, 0xba, 0x02, 0x07, - 0xd3, 0x27, 0x29, 0x53, 0xc8, 0x07, 0x7b, 0x48, 0x6e, 0x68, 0x4e, 0xf4, 0x90, 0x40, 0x8b, 0x18, - 0xa6, 0x4f, 0x58, 0x7b, 0x50, 0x1d, 0xca, 0xe9, 0xcd, 0x8a, 0xe7, 0xed, 0x10, 0xe5, 0xf0, 0x7f, - 0x37, 0xc0, 0xfe, 0xa0, 0x12, 0x9e, 0x80, 0x25, 0x19, 0x50, 0xa2, 0x76, 0xb4, 0x59, 0x86, 0x8a, - 0x90, 0x1c, 0xc8, 0x18, 0xa1, 0xe8, 0x13, 0x3e, 0x5d, 0x31, 0x29, 0xb1, 0xd0, 0xa6, 0xf1, 0x58, - 0x69, 0x93, 0x8b, 0x05, 0x44, 0xef, 0x00, 0x72, 0xfa, 0x94, 0x48, 0x29, 0x65, 0x35, 0x4b, 0x8e, - 0xf2, 0x70, 0x6b, 0x94, 0xba, 0x19, 0xec, 0xe6, 0x1a, 0x31, 0xff, 0x17, 0xd8, 0xfd, 0xa0, 0x36, - 0xf4, 0x25, 0x5f, 0xc5, 0xc9, 0xd6, 0x2a, 0x3d, 0xcf, 0xde, 0x8f, 0xb0, 0x57, 0x64, 0xd6, 0x5b, - 0xdf, 0x80, 0xb2, 0xac, 0x4e, 0xef, 0x19, 0xfa, 0xbc, 0x69, 0xac, 0x02, 0x4e, 0x7e, 0x03, 0x58, - 0xb3, 0x80, 0xaa, 0x50, 0xb9, 0xee, 0x9f, 0xf7, 0x2f, 0x7f, 0xee, 0x7b, 0xaf, 0x90, 0x03, 0xd6, - 0x59, 0xdc, 0xeb, 0x7a, 0x06, 0x72, 0xa1, 0xac, 0x74, 0xb6, 0x24, 0x44, 0x52, 0x8b, 0xac, 0x29, - 0x14, 0xb8, 0x50, 0x58, 0x0b, 0x55, 0xc0, 0x2c, 0x74, 0x54, 0x0b, 0xa7, 0x2d, 0xb0, 0x56, 0xcd, - 0x8a, 0xc0, 0x5a, 0x32, 0x1d, 0x11, 0x2c, 0x84, 0xd6, 0x0d, 0xbe, 0xfd, 0xf8, 0xe6, 0x71, 0xca, - 0x09, 0x63, 0xcd, 0x29, 0x6d, 0x29, 0xd4, 0x9a, 0xd0, 0xd6, 0x23, 0x6f, 0xc9, 0xdf, 0x70, 0x6b, - 0x5d, 0xf8, 0xd0, 0x96, 0x27, 0xef, 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, 0x94, 0x9d, 0xa9, 0x2b, - 0xd5, 0x07, 0x00, 0x00, + 0x10, 0x0e, 0x45, 0x8a, 0x22, 0x47, 0xa9, 0xbd, 0x5e, 0x3b, 0x86, 0x60, 0xa0, 0x80, 0xc0, 0x43, + 0xa3, 0x1a, 0xa8, 0x14, 0x28, 0xed, 0xa9, 0x27, 0x53, 0x62, 0x0d, 0xc5, 0xb4, 0x64, 0xac, 0xe8, + 0xa4, 0xc8, 0x85, 0xa0, 0xa4, 0xb5, 0x2c, 0x58, 0xe6, 0xca, 0xe4, 0xca, 0xae, 0x9e, 0xa0, 0xc7, + 0x1e, 0xfa, 0x14, 0x7d, 0x91, 0xbe, 0x49, 0x6f, 0x7d, 0x88, 0x62, 0x7f, 0x44, 0x49, 0x09, 0x90, + 0xba, 0x87, 0xde, 0xbe, 0xd9, 0x99, 0x9d, 0x9d, 0xf9, 0x66, 0xf8, 0x11, 0xd0, 0x68, 0x96, 0xce, + 0xd9, 0x74, 0x92, 0xf0, 0xa4, 0xb9, 0xc8, 0x18, 0x67, 0x18, 0x36, 0x27, 0x27, 0xd5, 0x87, 0x25, + 0xcd, 0x56, 0xca, 0x71, 0xb2, 0xc7, 0xd9, 0x82, 0x6d, 0x02, 0xbd, 0x4b, 0xa8, 0x74, 0x6e, 0x93, + 0x2c, 0xa7, 0x1c, 0x1f, 0x83, 0x3d, 0x9e, 0xcf, 0x68, 0xca, 0x6b, 0x46, 0xdd, 0x68, 0x94, 0x89, + 0xb6, 0x30, 0x06, 0x6b, 0xcc, 0xd2, 0xb4, 0x56, 0x92, 0xa7, 0x12, 0x8b, 0xd8, 0x9c, 0x66, 0x8f, + 0x34, 0xab, 0x99, 0x2a, 0x56, 0x59, 0xde, 0x5f, 0x26, 0x1c, 0xf8, 0xf2, 0xe9, 0x28, 0x4b, 0xd2, + 0x3c, 0x19, 0xf3, 0x19, 0x4b, 0xf1, 0x39, 0x40, 0xce, 0x13, 0x4e, 0xef, 0x69, 0xca, 0xf3, 0x9a, + 0x51, 0x37, 0x1b, 0xd5, 0xf6, 0xeb, 0xe6, 0x56, 0xd1, 0x9f, 0x5d, 0x69, 0x0e, 0xd7, 0xf1, 0x64, + 0xeb, 0x2a, 0x6e, 0x43, 0x95, 0x3e, 0xd2, 0x94, 0xc7, 0x9c, 0xdd, 0xd1, 0xb4, 0x66, 0xd5, 0x8d, + 0x46, 0xb5, 0x7d, 0xd0, 0x54, 0x0d, 0x06, 0xc2, 0x13, 0x09, 0x07, 0x01, 0x5a, 0xe0, 0x93, 0x3f, + 0x4b, 0xe0, 0x16, 0xd9, 0x70, 0x08, 0xce, 0x38, 0xe1, 0x74, 0xca, 0xb2, 0x95, 0x6c, 0x73, 0xaf, + 0xfd, 0xe6, 0x99, 0x85, 0x34, 0x3b, 0xfa, 0x1e, 0x29, 0x32, 0xe0, 0xef, 0xa0, 0x32, 0x56, 0xec, + 0x49, 0x76, 0xaa, 0xed, 0xc3, 0xed, 0x64, 0x9a, 0x58, 0xb2, 0x8e, 0xc1, 0x08, 0xcc, 0xfc, 0x61, + 0x2e, 0x29, 0x7b, 0x49, 0x04, 0xf4, 0xfe, 0x30, 0xc0, 0x59, 0xe7, 0xc5, 0x87, 0xb0, 0xef, 0x87, + 0xf1, 0x75, 0x9f, 0x04, 0x9d, 0xc1, 0x79, 0xbf, 0xf7, 0x31, 0xe8, 0xa2, 0x17, 0xf8, 0x25, 0x38, + 0x7e, 0x18, 0xfb, 0xc1, 0x79, 0xaf, 0x8f, 0x0c, 0xfc, 0x15, 0xb8, 0x7e, 0x18, 0x77, 0x06, 0x97, + 0x97, 0xbd, 0x08, 0x95, 0xf0, 0x3e, 0x54, 0xfd, 0x30, 0x26, 0x83, 0x30, 0xf4, 0xcf, 0x3a, 0x17, + 0xc8, 0xc4, 0xaf, 0xe0, 0xc0, 0x0f, 0xe3, 0xee, 0x65, 0x18, 0x77, 0x83, 0x2b, 0x12, 0x74, 0xce, + 0xa2, 0xa0, 0x8b, 0x2c, 0x0c, 0x60, 0x8b, 0xe3, 0x6e, 0x88, 0xca, 0x1a, 0x0f, 0x83, 0x08, 0xd9, + 0x3a, 0x5d, 0xaf, 0x3f, 0x0c, 0x48, 0x84, 0x2a, 0xda, 0xbc, 0xbe, 0xea, 0x9e, 0x45, 0x01, 0x72, + 0xb4, 0xd9, 0x0d, 0xc2, 0x20, 0x0a, 0x90, 0xfb, 0xce, 0x72, 0x4a, 0xc8, 0x7c, 0x67, 0x39, 0x26, + 0xb2, 0xbc, 0xdf, 0x0d, 0x78, 0x35, 0xe4, 0x19, 0x4d, 0xee, 0x2f, 0xe8, 0x8a, 0x24, 0xe9, 0x94, + 0x12, 0xfa, 0xb0, 0xa4, 0x39, 0xc7, 0x27, 0xe0, 0x2c, 0x58, 0x3e, 0x13, 0xdc, 0x49, 0x82, 0x5d, + 0x52, 0xd8, 0xb8, 0x05, 0xee, 0x1d, 0x5d, 0xc5, 0x99, 0x88, 0xd7, 0x84, 0xe1, 0x66, 0xb1, 0x90, + 0x45, 0x26, 0xe7, 0x4e, 0xa3, 0x6d, 0x7e, 0xcd, 0x7f, 0xe7, 0xd7, 0xbb, 0x81, 0xe3, 0x4f, 0x8b, + 0xca, 0x17, 0x2c, 0xcd, 0x29, 0x0e, 0x01, 0xab, 0x8b, 0x31, 0xdf, 0xcc, 0x56, 0xd6, 0x57, 0x6d, + 0x7f, 0xfd, 0xc5, 0x05, 0x20, 0x07, 0xa3, 0x4f, 0x8f, 0xbc, 0x5f, 0xe0, 0x50, 0xbd, 0x13, 0x25, + 0xa3, 0x39, 0xcd, 0x9f, 0xd3, 0xfa, 0x31, 0xd8, 0x5c, 0x06, 0xd7, 0x4a, 0x75, 0xb3, 0xe1, 0x12, + 0x6d, 0xfd, 0xd7, 0x0e, 0x27, 0x70, 0xb4, 0xfb, 0xf2, 0xff, 0xd2, 0xdf, 0xf7, 0x60, 0x91, 0xe5, + 0x9c, 0xe2, 0x23, 0x28, 0xdf, 0x27, 0x7c, 0x7c, 0xab, 0xbb, 0x51, 0x86, 0x68, 0xe5, 0x66, 0x36, + 0xe7, 0x34, 0x93, 0x23, 0x74, 0x89, 0xb6, 0xbc, 0x37, 0x60, 0xff, 0x24, 0x11, 0xfe, 0x06, 0xca, + 0xd9, 0x52, 0xf4, 0xaa, 0x3e, 0x75, 0xb4, 0x5d, 0x80, 0x48, 0x4c, 0x94, 0xdb, 0xfb, 0xdb, 0x80, + 0x97, 0xaa, 0xa0, 0x21, 0x5b, 0x66, 0x63, 0x2a, 0x18, 0xbc, 0xa3, 0xab, 0x7c, 0x91, 0x8c, 0xe9, + 0x9a, 0xc1, 0xb5, 0x2d, 0x8a, 0xc9, 0x6f, 0x93, 0x6c, 0xa2, 0x5f, 0x55, 0x06, 0xfe, 0x01, 0xaa, + 0x92, 0x49, 0x1e, 0xf3, 0xd5, 0x82, 0x4a, 0x0e, 0xf7, 0xda, 0x47, 0x9b, 0xa5, 0x92, 0x3c, 0xf1, + 0x68, 0xb5, 0xa0, 0x04, 0x78, 0x81, 0x77, 0x37, 0xd1, 0x7a, 0xc6, 0x26, 0x6e, 0xe6, 0x57, 0xde, + 0x99, 0xdf, 0x69, 0x41, 0x86, 0xad, 0xb3, 0x6c, 0xf5, 0xaa, 0xe8, 0x28, 0x08, 0xba, 0x02, 0x87, + 0xb0, 0x27, 0x29, 0x53, 0xd8, 0x03, 0x7b, 0x44, 0x6f, 0x58, 0x46, 0xf5, 0x90, 0x40, 0x8b, 0x18, + 0x61, 0x4f, 0x44, 0x7b, 0x70, 0x1d, 0xca, 0xc9, 0xcd, 0x9a, 0xe7, 0xdd, 0x10, 0xe5, 0xf0, 0x7e, + 0x33, 0xc0, 0x7e, 0xaf, 0x12, 0x9e, 0x82, 0x25, 0x19, 0x50, 0xa2, 0x76, 0xbc, 0x5d, 0x86, 0x8a, + 0x90, 0x1c, 0xc8, 0x18, 0xa1, 0xe8, 0x53, 0x3e, 0x5b, 0x33, 0x29, 0xb1, 0xd0, 0xa6, 0xc9, 0x44, + 0x69, 0x93, 0x4b, 0x04, 0xc4, 0x6f, 0x01, 0x32, 0xf6, 0x14, 0x4b, 0x29, 0xcd, 0x6b, 0x96, 0x1c, + 0xe5, 0xd1, 0xce, 0x28, 0x75, 0x33, 0xc4, 0xcd, 0x34, 0xca, 0xbd, 0x9f, 0x61, 0xef, 0xbd, 0xda, + 0xd0, 0xe7, 0x7c, 0x15, 0xa7, 0x3b, 0xab, 0xf4, 0x65, 0xf6, 0x7e, 0x84, 0xfd, 0x22, 0xb3, 0xde, + 0xfa, 0x06, 0x94, 0x65, 0x75, 0x7a, 0xcf, 0xf0, 0xe7, 0x4d, 0x13, 0x15, 0x70, 0xfa, 0xab, 0x01, + 0xb0, 0xa1, 0x01, 0x57, 0xa1, 0x72, 0xdd, 0xbf, 0xe8, 0x0f, 0x3e, 0xf4, 0xd1, 0x0b, 0xec, 0x80, + 0x75, 0x1e, 0xf5, 0xba, 0xc8, 0xc0, 0x2e, 0x94, 0x95, 0xd0, 0x96, 0x84, 0x4a, 0x6a, 0x95, 0x35, + 0x85, 0x04, 0x17, 0x12, 0x6b, 0xe1, 0x0a, 0x98, 0x85, 0x90, 0x6a, 0xe5, 0xb4, 0x05, 0xd6, 0xb2, + 0x59, 0x11, 0x58, 0x6b, 0xa6, 0x23, 0x82, 0x85, 0xd2, 0xba, 0x02, 0x90, 0xc1, 0x07, 0x04, 0xfe, + 0xb7, 0x1f, 0x5f, 0x3f, 0xce, 0x38, 0xcd, 0xf3, 0xe6, 0x8c, 0xb5, 0x14, 0x6a, 0x4d, 0x59, 0xeb, + 0x91, 0xb7, 0xe4, 0x0f, 0xb9, 0xb5, 0x69, 0x61, 0x64, 0xcb, 0x93, 0xb7, 0xff, 0x04, 0x00, 0x00, + 0xff, 0xff, 0x70, 0xe7, 0x5c, 0xa7, 0xdf, 0x07, 0x00, 0x00, } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e698969816..3de0ad5aa7 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -46,6 +46,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/tableacl" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vterrors" @@ -60,6 +61,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer" "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" + "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" @@ -170,6 +172,7 @@ type TabletServer struct { hr *heartbeat.Reader messager *messager.Engine watcher *ReplicationWatcher + vstreamer *vstreamer.Engine updateStreamList *binlog.StreamList // checkMySQLThrottler is used to throttle the number of @@ -238,6 +241,7 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali tsv.txThrottler = txthrottler.CreateTxThrottlerFromTabletConfig(topoServer) tsv.messager = messager.NewEngine(tsv, tsv.se, config) tsv.watcher = NewReplicationWatcher(tsv.se, config) + tsv.vstreamer = vstreamer.NewEngine(srvtopo.NewResilientServer(topoServer, "TabletSrvTopo"), tsv.se) tsv.updateStreamList = &binlog.StreamList{} // FIXME(alainjobart) could we move this to the Register method below? // So that vtcombo doesn't even call it once, on the first tablet. @@ -348,6 +352,7 @@ func (tsv *TabletServer) InitDBConfig(target querypb.Target, dbcfgs *dbconfigs.D tsv.hr.InitDBConfig(tsv.dbconfigs) tsv.messager.InitDBConfig(tsv.dbconfigs) tsv.watcher.InitDBConfig(tsv.dbconfigs) + tsv.vstreamer.InitDBConfig(tsv.dbconfigs) return nil } @@ -513,6 +518,7 @@ func (tsv *TabletServer) fullStart() (err error) { } tsv.hr.Init(tsv.target) tsv.updateStreamList.Init() + tsv.vstreamer.Open(tsv.target.Keyspace, tsv.alias.Cell) return tsv.serveNewType() } @@ -577,6 +583,7 @@ func (tsv *TabletServer) StopService() { tsv.se.Close() tsv.hw.Close() tsv.hr.Close() + tsv.vstreamer.Close() log.Infof("Shutdown complete.") tsv.transition(StateNotConnected) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 50dc5e4a4d..8e2b057ed4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -23,6 +23,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" @@ -37,6 +38,9 @@ var vschemaErrors *stats.Counter // Engine is the engine for handling vseplication streaming requests. type Engine struct { + // cp is initialized by InitDBConfig + cp *mysql.ConnParams + // mu protects isOpen, streamers, streamIdx and kschema. mu sync.Mutex @@ -69,6 +73,11 @@ func NewEngine(ts srvtopo.Server, se *schema.Engine) *Engine { } } +// InitDBConfig performs saves the required info from dbconfigs for future use. +func (vse *Engine) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) { + vse.cp = dbcfgs.DbaWithDB() +} + // Open starts the Engine service. func (vse *Engine) Open(keyspace, cell string) error { vse.mu.Lock() @@ -103,7 +112,7 @@ func (vse *Engine) Close() { } // Stream starts a new stream. -func (vse *Engine) Stream(ctx context.Context, cp *mysql.ConnParams, startPos mysql.Position, filter *binlogdatapb.Filter, send func(*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos mysql.Position, filter *binlogdatapb.Filter, send func(*binlogdatapb.VEvent) error) error { // Ensure kschema is initialized and the watcher is started. vse.watcherOnce.Do(vse.setWatch) @@ -114,7 +123,7 @@ func (vse *Engine) Stream(ctx context.Context, cp *mysql.ConnParams, startPos my if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } - streamer := newVStreamer(ctx, cp, vse.se, startPos, filter, vse.kschema, send) + streamer := newVStreamer(ctx, vse.cp, vse.se, startPos, filter, vse.kschema, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 24ccbd40f0..f4c9f6c6c2 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -93,6 +93,8 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error if plan.Vindex == nil { return true, result, nil } + + // Filter by Vindex. destinations, err := plan.Vindex.Map(nil, []sqltypes.Value{result[plan.VindexColumn]}) if err != nil { return false, nil, err @@ -154,6 +156,8 @@ func buildREPlan(ti *Table, kschema *vindexes.KeyspaceSchema, filter string) (*P if len(table.ColumnVindexes) == 0 { return nil, fmt.Errorf("table %s has no primary vindex", ti.Name) } + // findColumn can be used here because result column list is same + // as source. colnum, err := findColumn(ti, table.ColumnVindexes[0].Columns[0]) if err != nil { return nil, err @@ -208,11 +212,22 @@ func buildTablePlan(ti *Table, kschema *vindexes.KeyspaceSchema, query string) ( } plan.ColExprs = append(plan.ColExprs, cExpr) } + } else { + if len(sel.SelectExprs) != 1 { + return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel)) + } + plan.ColExprs = make([]ColExpr, len(ti.Columns)) + for i, col := range ti.Columns { + plan.ColExprs[i].ColNum = i + plan.ColExprs[i].Alias = col.Name + } } if sel.Where == nil { return plan, nil } + + // Filter by Vindex. funcExpr, ok := sel.Where.Expr.(*sqlparser.FuncExpr) if !ok { return nil, fmt.Errorf("unexpected where clause: %v", sqlparser.String(sel.Where)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/streamer.go b/go/vt/vttablet/tabletserver/vstreamer/streamer.go index f31ebe4b72..d2291a5a5e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/streamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/streamer.go @@ -136,11 +136,17 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog if !ok { return fmt.Errorf("server EOF") } - if err := vs.parseEvent(ev); err != nil { + vevents, err := vs.parseEvent(ev) + if err != nil { return err } + for _, vevent := range vevents { + if err := vs.send(vevent); err != nil { + return fmt.Errorf("error sending event: %v", err) + } + } case vs.kschema = <-vs.kevents: - if err := vs.updatePlans(); err != nil { + if err := vs.rebuildPlans(); err != nil { return err } case <-ctx.Done(): @@ -149,10 +155,10 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } } -func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error { +func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { // Validate the buffer before reading fields from it. if !ev.IsValid() { - return fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) + return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } // We need to keep checking for FORMAT_DESCRIPTION_EVENT even after we've @@ -162,9 +168,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error { var err error vs.format, err = ev.Format() if err != nil { - return fmt.Errorf("can't parse FORMAT_DESCRIPTION_EVENT: %v, event data: %#v", err, ev) + return nil, fmt.Errorf("can't parse FORMAT_DESCRIPTION_EVENT: %v, event data: %#v", err, ev) } - return nil + return nil, nil } // We can't parse anything until we get a FORMAT_DESCRIPTION_EVENT that @@ -174,82 +180,70 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error { // is a fake ROTATE_EVENT, which the master sends to tell us the name // of the current log file. if ev.IsRotate() { - return nil + return nil, nil } - return fmt.Errorf("got a real event before FORMAT_DESCRIPTION_EVENT: %#v", ev) + return nil, fmt.Errorf("got a real event before FORMAT_DESCRIPTION_EVENT: %#v", ev) } // Strip the checksum, if any. We don't actually verify the checksum, so discard it. ev, _, err := ev.StripChecksum(vs.format) if err != nil { - return fmt.Errorf("can't strip checksum from binlog event: %v, event data: %#v", err, ev) + return nil, fmt.Errorf("can't strip checksum from binlog event: %v, event data: %#v", err, ev) } + var vevents []*binlogdatapb.VEvent switch { case ev.IsPseudo() || ev.IsGTID(): gtid, hasBegin, err := ev.GTID(vs.format) if err != nil { - return fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev) - } - vs.pos = mysql.AppendGTID(vs.pos, gtid) - vevent := &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_GTID, - Gtid: mysql.EncodePosition(vs.pos), - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending GTID: %v", err) + return nil, fmt.Errorf("can't get GTID from binlog event: %v, event data: %#v", err, ev) } if hasBegin { - vevent := &binlogdatapb.VEvent{ + vevents = append(vevents, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_BEGIN, - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending BEGIN: %v", err) - } + }) } + vs.pos = mysql.AppendGTID(vs.pos, gtid) + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_GTID, + Gtid: mysql.EncodePosition(vs.pos), + }) case ev.IsXID(): - vevent := &binlogdatapb.VEvent{ + vevents = append(vevents, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_COMMIT, - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending COMMIT: %v", err) - } + }) case ev.IsQuery(): q, err := ev.Query(vs.format) if err != nil { - return fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev) + return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev) } - var vevent *binlogdatapb.VEvent switch cat := getStatementCategory(q.SQL); cat { case binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_ROLLBACK: - vevent = &binlogdatapb.VEvent{ + vevents = append(vevents, &binlogdatapb.VEvent{ Type: cat, - } + }) case binlogdatapb.VEventType_DDL: - vevent = &binlogdatapb.VEvent{ + vevents = append(vevents, &binlogdatapb.VEvent{ Type: cat, Ddl: q.SQL, - } + }) default: - return fmt.Errorf("unexpected event type %v in row-based replication: %#v", cat, ev) - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending COMMIT: %v", err) + return nil, fmt.Errorf("unexpected event type %v in row-based replication: %#v", cat, ev) } case ev.IsTableMap(): id := ev.TableID(vs.format) tm, err := ev.TableMap(vs.format) if err != nil { - return err + return nil, err } if tm.Database != "" && tm.Database != vs.cp.DbName { - return nil + return nil, nil } ti := vs.se.GetTable(sqlparser.NewTableIdent(tm.Name)) if ti == nil { - return fmt.Errorf("unknown table %v in schema", tm.Name) + return nil, fmt.Errorf("unknown table %v in schema", tm.Name) } if len(ti.Columns) < len(tm.Types) { - return fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(ti.Columns), ev) + return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(ti.Columns), ev) } table := &Table{ TableMap: tm, @@ -257,88 +251,35 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error { } plan, err := buildPlan(table, vs.kschema, vs.filter) if err != nil { - return err + return nil, err } if plan != nil { vs.plans[id] = plan } - case ev.IsWriteRows(): + case ev.IsWriteRows() || ev.IsDeleteRows() || ev.IsUpdateRows(): + // The existence of before and after images can be used to + // identify statememt types. It's also possible that the + // before and after images end up going to different shards. + // If so, an update will be treated as delete on one shard + // and insert on the other. id := ev.TableID(vs.format) plan, ok := vs.plans[id] if !ok { - return nil + return nil, nil } rows, err := ev.Rows(vs.format, plan.Table.TableMap) if err != nil { - return err - } - rowEvents := make([]*binlogdatapb.RowEvent, 0, len(rows.Rows)) - for _, row := range rows.Rows { - ok, values, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) - if err != nil { - return err - } - if !ok { - continue - } - encoded := sqltypes.RowToProto3(values) - rowEvents = append(rowEvents, &binlogdatapb.RowEvent{After: encoded}) - } - vevent := &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_INSERT, - RowEvents: rowEvents, - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending INSERT: %v", err) - } - case ev.IsDeleteRows(): - id := ev.TableID(vs.format) - plan, ok := vs.plans[id] - if !ok { - return nil - } - rows, err := ev.Rows(vs.format, plan.Table.TableMap) - if err != nil { - return err - } - rowEvents := make([]*binlogdatapb.RowEvent, 0, len(rows.Rows)) - for _, row := range rows.Rows { - ok, values, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) - if err != nil { - return err - } - if !ok { - continue - } - encoded := sqltypes.RowToProto3(values) - rowEvents = append(rowEvents, &binlogdatapb.RowEvent{Before: encoded}) - } - vevent := &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_DELETE, - RowEvents: rowEvents, - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending DELETE: %v", err) - } - case ev.IsUpdateRows(): - id := ev.TableID(vs.format) - plan, ok := vs.plans[id] - if !ok { - return nil - } - rows, err := ev.Rows(vs.format, plan.Table.TableMap) - if err != nil { - return err + return nil, err } rowEvents := make([]*binlogdatapb.RowEvent, 0, len(rows.Rows)) for _, row := range rows.Rows { beforeOK, beforeValues, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns) if err != nil { - return err + return nil, err } afterOK, afterValues, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns) if err != nil { - return err + return nil, err } if !beforeOK && !afterOK { continue @@ -352,18 +293,15 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) error { } rowEvents = append(rowEvents, rowEvent) } - vevent := &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_UPDATE, + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_ROW, RowEvents: rowEvents, - } - if err := vs.send(vevent); err != nil { - return fmt.Errorf("error sending UPDATE: %v", err) - } + }) } - return nil + return vevents, nil } -func (vs *vstreamer) updatePlans() error { +func (vs *vstreamer) rebuildPlans() error { for id, plan := range vs.plans { newPlan, err := buildPlan(plan.Table, vs.kschema, vs.filter) if err != nil { @@ -375,12 +313,15 @@ func (vs *vstreamer) updatePlans() error { } func (vs *vstreamer) extractRowAndFilter(plan *Plan, data []byte, dataColumns, nullColumns mysql.Bitmap) (bool, []sqltypes.Value, error) { + if len(data) == 0 { + return false, nil, nil + } values := make([]sqltypes.Value, dataColumns.Count()) valueIndex := 0 pos := 0 for colNum := 0; colNum < dataColumns.Count(); colNum++ { if !dataColumns.Bit(colNum) { - continue + return false, nil, fmt.Errorf("partial row image encountered: ensure binlog_row_image is set to 'full'") } if nullColumns.Bit(valueIndex) { valueIndex++ diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 1a2ab44509..b75d94fa03 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -156,6 +156,8 @@ message BinlogSource { } // VEventType enumerates the event types. +// This list is comprehensive. Many of these types +// will not be encountered in RBR mode. enum VEventType { UNKNOWN = 0; GTID = 1; @@ -167,6 +169,7 @@ enum VEventType { UPDATE = 7; DELETE = 8; SET = 9; + ROW = 10; } message RowEvent { diff --git a/py/vtproto/binlogdata_pb2.py b/py/vtproto/binlogdata_pb2.py index b9c3737957..e3d91c969f 100644 --- a/py/vtproto/binlogdata_pb2.py +++ b/py/vtproto/binlogdata_pb2.py @@ -22,7 +22,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( package='binlogdata', syntax='proto3', serialized_options=_b('Z\'vitess.io/vitess/go/vt/proto/binlogdata'), - serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xb5\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\"A\n\x08RowEvent\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"s\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x0c\n\x04gtid\x18\x02 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x03 \x01(\t\x12(\n\nrow_events\x18\x04 \x03(\x0b\x32\x14.binlogdata.RowEvent\"F\n\x0eVStreamRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\"\n\x06\x66ilter\x18\x02 \x01(\x0b\x32\x12.binlogdata.Filter\"4\n\x0fVStreamResponse\x12!\n\x05\x65vent\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*~\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\n\n\x06UPDATE\x10\x07\x12\n\n\x06\x44\x45LETE\x10\x08\x12\x07\n\x03SET\x10\tB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3') + serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xb5\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\"A\n\x08RowEvent\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"s\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x0c\n\x04gtid\x18\x02 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x03 \x01(\t\x12(\n\nrow_events\x18\x04 \x03(\x0b\x32\x14.binlogdata.RowEvent\"F\n\x0eVStreamRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\"\n\x06\x66ilter\x18\x02 \x01(\x0b\x32\x12.binlogdata.Filter\"4\n\x0fVStreamResponse\x12!\n\x05\x65vent\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*\x87\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\n\n\x06UPDATE\x10\x07\x12\n\n\x06\x44\x45LETE\x10\x08\x12\x07\n\x03SET\x10\t\x12\x07\n\x03ROW\x10\nB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3') , dependencies=[query__pb2.DESCRIPTOR,topodata__pb2.DESCRIPTOR,]) @@ -72,11 +72,15 @@ _VEVENTTYPE = _descriptor.EnumDescriptor( name='SET', index=9, number=9, serialized_options=None, type=None), + _descriptor.EnumValueDescriptor( + name='ROW', index=10, number=10, + serialized_options=None, + type=None), ], containing_type=None, serialized_options=None, - serialized_start=1517, - serialized_end=1643, + serialized_start=1518, + serialized_end=1653, ) _sym_db.RegisterEnumDescriptor(_VEVENTTYPE) @@ -91,6 +95,7 @@ INSERT = 6 UPDATE = 7 DELETE = 8 SET = 9 +ROW = 10 _BINLOGTRANSACTION_STATEMENT_CATEGORY = _descriptor.EnumDescriptor(