Also had to added transmission of field info, which may come
in handy for encoding the values on the player end.

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-01-01 23:24:40 -08:00
Родитель 6b11903709
Коммит 6ff51d39cf
9 изменённых файлов: 955 добавлений и 120 удалений

Просмотреть файл

@ -39,6 +39,7 @@ const (
VEventType_SET VEventType = 10
VEventType_OTHER VEventType = 11
VEventType_ROW VEventType = 12
VEventType_FIELD VEventType = 13
)
var VEventType_name = map[int32]string{
@ -55,6 +56,7 @@ var VEventType_name = map[int32]string{
10: "SET",
11: "OTHER",
12: "ROW",
13: "FIELD",
}
var VEventType_value = map[string]int32{
"UNKNOWN": 0,
@ -70,13 +72,14 @@ var VEventType_value = map[string]int32{
"SET": 10,
"OTHER": 11,
"ROW": 12,
"FIELD": 13,
}
func (x VEventType) String() string {
return proto.EnumName(VEventType_name, int32(x))
}
func (VEventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{0}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{0}
}
type BinlogTransaction_Statement_Category int32
@ -124,7 +127,7 @@ func (x BinlogTransaction_Statement_Category) String() string {
return proto.EnumName(BinlogTransaction_Statement_Category_name, int32(x))
}
func (BinlogTransaction_Statement_Category) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{1, 0, 0}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{1, 0, 0}
}
// Charset is the per-statement charset info from a QUERY_EVENT binlog entry.
@ -144,7 +147,7 @@ func (m *Charset) Reset() { *m = Charset{} }
func (m *Charset) String() string { return proto.CompactTextString(m) }
func (*Charset) ProtoMessage() {}
func (*Charset) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{0}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{0}
}
func (m *Charset) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Charset.Unmarshal(m, b)
@ -201,7 +204,7 @@ func (m *BinlogTransaction) Reset() { *m = BinlogTransaction{} }
func (m *BinlogTransaction) String() string { return proto.CompactTextString(m) }
func (*BinlogTransaction) ProtoMessage() {}
func (*BinlogTransaction) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{1}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{1}
}
func (m *BinlogTransaction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogTransaction.Unmarshal(m, b)
@ -251,7 +254,7 @@ func (m *BinlogTransaction_Statement) Reset() { *m = BinlogTransaction_S
func (m *BinlogTransaction_Statement) String() string { return proto.CompactTextString(m) }
func (*BinlogTransaction_Statement) ProtoMessage() {}
func (*BinlogTransaction_Statement) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{1, 0}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{1, 0}
}
func (m *BinlogTransaction_Statement) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogTransaction_Statement.Unmarshal(m, b)
@ -309,7 +312,7 @@ func (m *StreamKeyRangeRequest) Reset() { *m = StreamKeyRangeRequest{} }
func (m *StreamKeyRangeRequest) String() string { return proto.CompactTextString(m) }
func (*StreamKeyRangeRequest) ProtoMessage() {}
func (*StreamKeyRangeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{2}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{2}
}
func (m *StreamKeyRangeRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamKeyRangeRequest.Unmarshal(m, b)
@ -362,7 +365,7 @@ func (m *StreamKeyRangeResponse) Reset() { *m = StreamKeyRangeResponse{}
func (m *StreamKeyRangeResponse) String() string { return proto.CompactTextString(m) }
func (*StreamKeyRangeResponse) ProtoMessage() {}
func (*StreamKeyRangeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{3}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{3}
}
func (m *StreamKeyRangeResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamKeyRangeResponse.Unmarshal(m, b)
@ -406,7 +409,7 @@ func (m *StreamTablesRequest) Reset() { *m = StreamTablesRequest{} }
func (m *StreamTablesRequest) String() string { return proto.CompactTextString(m) }
func (*StreamTablesRequest) ProtoMessage() {}
func (*StreamTablesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{4}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{4}
}
func (m *StreamTablesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamTablesRequest.Unmarshal(m, b)
@ -459,7 +462,7 @@ func (m *StreamTablesResponse) Reset() { *m = StreamTablesResponse{} }
func (m *StreamTablesResponse) String() string { return proto.CompactTextString(m) }
func (*StreamTablesResponse) ProtoMessage() {}
func (*StreamTablesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{5}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{5}
}
func (m *StreamTablesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamTablesResponse.Unmarshal(m, b)
@ -504,7 +507,7 @@ func (m *Rule) Reset() { *m = Rule{} }
func (m *Rule) String() string { return proto.CompactTextString(m) }
func (*Rule) ProtoMessage() {}
func (*Rule) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{6}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{6}
}
func (m *Rule) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Rule.Unmarshal(m, b)
@ -551,7 +554,7 @@ func (m *Filter) Reset() { *m = Filter{} }
func (m *Filter) String() string { return proto.CompactTextString(m) }
func (*Filter) ProtoMessage() {}
func (*Filter) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{7}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{7}
}
func (m *Filter) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filter.Unmarshal(m, b)
@ -604,7 +607,7 @@ func (m *BinlogSource) Reset() { *m = BinlogSource{} }
func (m *BinlogSource) String() string { return proto.CompactTextString(m) }
func (*BinlogSource) ProtoMessage() {}
func (*BinlogSource) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{8}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{8}
}
func (m *BinlogSource) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogSource.Unmarshal(m, b)
@ -679,7 +682,7 @@ func (m *RowChange) Reset() { *m = RowChange{} }
func (m *RowChange) String() string { return proto.CompactTextString(m) }
func (*RowChange) ProtoMessage() {}
func (*RowChange) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{9}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{9}
}
func (m *RowChange) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RowChange.Unmarshal(m, b)
@ -726,7 +729,7 @@ func (m *RowEvent) Reset() { *m = RowEvent{} }
func (m *RowEvent) String() string { return proto.CompactTextString(m) }
func (*RowEvent) ProtoMessage() {}
func (*RowEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{10}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{10}
}
func (m *RowEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RowEvent.Unmarshal(m, b)
@ -760,22 +763,69 @@ func (m *RowEvent) GetRowChanges() []*RowChange {
return nil
}
type FieldEvent struct {
TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"`
Fields []*query.Field `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *FieldEvent) Reset() { *m = FieldEvent{} }
func (m *FieldEvent) String() string { return proto.CompactTextString(m) }
func (*FieldEvent) ProtoMessage() {}
func (*FieldEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{11}
}
func (m *FieldEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FieldEvent.Unmarshal(m, b)
}
func (m *FieldEvent) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_FieldEvent.Marshal(b, m, deterministic)
}
func (dst *FieldEvent) XXX_Merge(src proto.Message) {
xxx_messageInfo_FieldEvent.Merge(dst, src)
}
func (m *FieldEvent) XXX_Size() int {
return xxx_messageInfo_FieldEvent.Size(m)
}
func (m *FieldEvent) XXX_DiscardUnknown() {
xxx_messageInfo_FieldEvent.DiscardUnknown(m)
}
var xxx_messageInfo_FieldEvent proto.InternalMessageInfo
func (m *FieldEvent) GetTableName() string {
if m != nil {
return m.TableName
}
return ""
}
func (m *FieldEvent) GetFields() []*query.Field {
if m != nil {
return m.Fields
}
return nil
}
// VEvent represents a vstream event
type VEvent struct {
Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"`
Gtid string `protobuf:"bytes,2,opt,name=gtid,proto3" json:"gtid,omitempty"`
Ddl string `protobuf:"bytes,3,opt,name=ddl,proto3" json:"ddl,omitempty"`
RowEvent *RowEvent `protobuf:"bytes,4,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"`
Gtid string `protobuf:"bytes,2,opt,name=gtid,proto3" json:"gtid,omitempty"`
Ddl string `protobuf:"bytes,3,opt,name=ddl,proto3" json:"ddl,omitempty"`
RowEvent *RowEvent `protobuf:"bytes,4,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"`
FieldEvent *FieldEvent `protobuf:"bytes,5,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *VEvent) Reset() { *m = VEvent{} }
func (m *VEvent) String() string { return proto.CompactTextString(m) }
func (*VEvent) ProtoMessage() {}
func (*VEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{11}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{12}
}
func (m *VEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VEvent.Unmarshal(m, b)
@ -823,6 +873,13 @@ func (m *VEvent) GetRowEvent() *RowEvent {
return nil
}
func (m *VEvent) GetFieldEvent() *FieldEvent {
if m != nil {
return m.FieldEvent
}
return nil
}
// VStreamRequest is the payload for VStream
type VStreamRequest struct {
Position string `protobuf:"bytes,1,opt,name=position,proto3" json:"position,omitempty"`
@ -836,7 +893,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} }
func (m *VStreamRequest) String() string { return proto.CompactTextString(m) }
func (*VStreamRequest) ProtoMessage() {}
func (*VStreamRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{12}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{13}
}
func (m *VStreamRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VStreamRequest.Unmarshal(m, b)
@ -882,7 +939,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} }
func (m *VStreamResponse) String() string { return proto.CompactTextString(m) }
func (*VStreamResponse) ProtoMessage() {}
func (*VStreamResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_8c20ffb2cc31e3c4, []int{13}
return fileDescriptor_binlogdata_e1edbb575eea20d0, []int{14}
}
func (m *VStreamResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VStreamResponse.Unmarshal(m, b)
@ -922,6 +979,7 @@ func init() {
proto.RegisterType((*BinlogSource)(nil), "binlogdata.BinlogSource")
proto.RegisterType((*RowChange)(nil), "binlogdata.RowChange")
proto.RegisterType((*RowEvent)(nil), "binlogdata.RowEvent")
proto.RegisterType((*FieldEvent)(nil), "binlogdata.FieldEvent")
proto.RegisterType((*VEvent)(nil), "binlogdata.VEvent")
proto.RegisterType((*VStreamRequest)(nil), "binlogdata.VStreamRequest")
proto.RegisterType((*VStreamResponse)(nil), "binlogdata.VStreamResponse")
@ -929,69 +987,72 @@ func init() {
proto.RegisterEnum("binlogdata.BinlogTransaction_Statement_Category", BinlogTransaction_Statement_Category_name, BinlogTransaction_Statement_Category_value)
}
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_8c20ffb2cc31e3c4) }
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_e1edbb575eea20d0) }
var fileDescriptor_binlogdata_8c20ffb2cc31e3c4 = []byte{
// 964 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xcb, 0x6e, 0xe2, 0x56,
0x18, 0x1e, 0x63, 0x63, 0xec, 0xdf, 0x69, 0xe6, 0xe4, 0xe4, 0x22, 0x14, 0x69, 0xa4, 0xc8, 0x8b,
0x0e, 0x8d, 0x54, 0x98, 0xd2, 0xcb, 0xa6, 0xab, 0x00, 0x6e, 0xca, 0xc4, 0x81, 0xf4, 0xe0, 0xcc,
0x54, 0xb3, 0xb1, 0x0c, 0x9c, 0x10, 0x14, 0xb0, 0x89, 0x7d, 0x48, 0xca, 0x43, 0x74, 0xd5, 0x07,
0xe8, 0xba, 0x2f, 0xd2, 0x37, 0xe9, 0xae, 0x0f, 0x51, 0x9d, 0x8b, 0x0d, 0x64, 0xa4, 0x69, 0xba,
0xe8, 0xee, 0xbf, 0x9d, 0xff, 0xf2, 0xfd, 0xbf, 0x3f, 0x00, 0x34, 0x9c, 0xc6, 0xb3, 0x64, 0x32,
0x8e, 0x58, 0x54, 0x5f, 0xa4, 0x09, 0x4b, 0x30, 0xac, 0x2d, 0xc7, 0xce, 0xfd, 0x92, 0xa6, 0x2b,
0xe9, 0x38, 0xde, 0x65, 0xc9, 0x22, 0x59, 0x07, 0xba, 0x97, 0x50, 0x69, 0xdf, 0x46, 0x69, 0x46,
0x19, 0x3e, 0x02, 0x73, 0x34, 0x9b, 0xd2, 0x98, 0x55, 0xb5, 0x13, 0xad, 0x56, 0x26, 0x4a, 0xc3,
0x18, 0x8c, 0x51, 0x12, 0xc7, 0xd5, 0x92, 0xb0, 0x0a, 0x99, 0xc7, 0x66, 0x34, 0x7d, 0xa0, 0x69,
0x55, 0x97, 0xb1, 0x52, 0x73, 0xff, 0xd2, 0x61, 0xaf, 0x25, 0x4a, 0x07, 0x69, 0x14, 0x67, 0xd1,
0x88, 0x4d, 0x93, 0x18, 0x9f, 0x03, 0x64, 0x2c, 0x62, 0x74, 0x4e, 0x63, 0x96, 0x55, 0xb5, 0x13,
0xbd, 0xe6, 0x34, 0x5f, 0xd7, 0x37, 0x9a, 0xfe, 0xe8, 0x49, 0x7d, 0x90, 0xc7, 0x93, 0x8d, 0xa7,
0xb8, 0x09, 0x0e, 0x7d, 0xa0, 0x31, 0x0b, 0x59, 0x72, 0x47, 0xe3, 0xaa, 0x71, 0xa2, 0xd5, 0x9c,
0xe6, 0x5e, 0x5d, 0x0e, 0xe8, 0x71, 0x4f, 0xc0, 0x1d, 0x04, 0x68, 0x21, 0x1f, 0xff, 0x59, 0x02,
0xbb, 0xc8, 0x86, 0x7d, 0xb0, 0x46, 0x11, 0xa3, 0x93, 0x24, 0x5d, 0x89, 0x31, 0x77, 0x9b, 0x6f,
0x9e, 0xd9, 0x48, 0xbd, 0xad, 0xde, 0x91, 0x22, 0x03, 0xfe, 0x12, 0x2a, 0x23, 0x89, 0x9e, 0x40,
0xc7, 0x69, 0xee, 0x6f, 0x26, 0x53, 0xc0, 0x92, 0x3c, 0x06, 0x23, 0xd0, 0xb3, 0xfb, 0x99, 0x80,
0x6c, 0x87, 0x70, 0xd1, 0xfd, 0x43, 0x03, 0x2b, 0xcf, 0x8b, 0xf7, 0xe1, 0x65, 0xcb, 0x0f, 0xaf,
0x7b, 0xc4, 0x6b, 0xf7, 0xcf, 0x7b, 0xdd, 0x0f, 0x5e, 0x07, 0xbd, 0xc0, 0x3b, 0x60, 0xb5, 0xfc,
0xb0, 0xe5, 0x9d, 0x77, 0x7b, 0x48, 0xc3, 0x9f, 0x81, 0xdd, 0xf2, 0xc3, 0x76, 0xff, 0xf2, 0xb2,
0x1b, 0xa0, 0x12, 0x7e, 0x09, 0x4e, 0xcb, 0x0f, 0x49, 0xdf, 0xf7, 0x5b, 0x67, 0xed, 0x0b, 0xa4,
0xe3, 0x43, 0xd8, 0x6b, 0xf9, 0x61, 0xe7, 0xd2, 0x0f, 0x3b, 0xde, 0x15, 0xf1, 0xda, 0x67, 0x81,
0xd7, 0x41, 0x06, 0x06, 0x30, 0xb9, 0xb9, 0xe3, 0xa3, 0xb2, 0x92, 0x07, 0x5e, 0x80, 0x4c, 0x95,
0xae, 0xdb, 0x1b, 0x78, 0x24, 0x40, 0x15, 0xa5, 0x5e, 0x5f, 0x75, 0xce, 0x02, 0x0f, 0x59, 0x4a,
0xed, 0x78, 0xbe, 0x17, 0x78, 0xc8, 0x7e, 0x6b, 0x58, 0x25, 0xa4, 0xbf, 0x35, 0x2c, 0x1d, 0x19,
0xee, 0x6f, 0x1a, 0x1c, 0x0e, 0x58, 0x4a, 0xa3, 0xf9, 0x05, 0x5d, 0x91, 0x28, 0x9e, 0x50, 0x42,
0xef, 0x97, 0x34, 0x63, 0xf8, 0x18, 0xac, 0x45, 0x92, 0x4d, 0x39, 0x76, 0x02, 0x60, 0x9b, 0x14,
0x3a, 0x6e, 0x80, 0x7d, 0x47, 0x57, 0x61, 0xca, 0xe3, 0x15, 0x60, 0xb8, 0x5e, 0x1c, 0x64, 0x91,
0xc9, 0xba, 0x53, 0xd2, 0x26, 0xbe, 0xfa, 0xbf, 0xe3, 0xeb, 0xde, 0xc0, 0xd1, 0xd3, 0xa6, 0xb2,
0x45, 0x12, 0x67, 0x14, 0xfb, 0x80, 0xe5, 0xc3, 0x90, 0xad, 0x77, 0x2b, 0xfa, 0x73, 0x9a, 0xaf,
0x3e, 0x79, 0x00, 0x64, 0x6f, 0xf8, 0xd4, 0xe4, 0xfe, 0x02, 0xfb, 0xb2, 0x4e, 0x10, 0x0d, 0x67,
0x34, 0x7b, 0xce, 0xe8, 0x47, 0x60, 0x32, 0x11, 0x5c, 0x2d, 0x9d, 0xe8, 0x35, 0x9b, 0x28, 0xed,
0xbf, 0x4e, 0x38, 0x86, 0x83, 0xed, 0xca, 0xff, 0xcb, 0x7c, 0xdf, 0x80, 0x41, 0x96, 0x33, 0x8a,
0x0f, 0xa0, 0x3c, 0x8f, 0xd8, 0xe8, 0x56, 0x4d, 0x23, 0x15, 0x3e, 0xca, 0xcd, 0x74, 0xc6, 0x68,
0x2a, 0x56, 0x68, 0x13, 0xa5, 0xb9, 0x6f, 0xc0, 0xfc, 0x41, 0x48, 0xf8, 0x73, 0x28, 0xa7, 0x4b,
0x3e, 0xab, 0xfc, 0xd4, 0xd1, 0x66, 0x03, 0x3c, 0x31, 0x91, 0x6e, 0xf7, 0x6f, 0x0d, 0x76, 0x64,
0x43, 0x83, 0x64, 0x99, 0x8e, 0x28, 0x47, 0xf0, 0x8e, 0xae, 0xb2, 0x45, 0x34, 0xa2, 0x39, 0x82,
0xb9, 0xce, 0x9b, 0xc9, 0x6e, 0xa3, 0x74, 0xac, 0xaa, 0x4a, 0x05, 0x7f, 0x0b, 0x8e, 0x40, 0x92,
0x85, 0x6c, 0xb5, 0xa0, 0x02, 0xc3, 0xdd, 0xe6, 0xc1, 0xfa, 0xa8, 0x04, 0x4e, 0x2c, 0x58, 0x2d,
0x28, 0x01, 0x56, 0xc8, 0xdb, 0x97, 0x68, 0x3c, 0xe3, 0x12, 0xd7, 0xfb, 0x2b, 0x6f, 0xed, 0xef,
0xb4, 0x00, 0xc3, 0x54, 0x59, 0x36, 0x66, 0x95, 0x70, 0x14, 0x00, 0xfd, 0x04, 0x36, 0x49, 0x1e,
0xdb, 0xb7, 0x22, 0xa1, 0x0b, 0xe6, 0x90, 0xde, 0x24, 0x29, 0x55, 0x5b, 0x02, 0xc5, 0x62, 0x24,
0x79, 0x24, 0xca, 0x83, 0x4f, 0xa0, 0x1c, 0xdd, 0xe4, 0x40, 0x6f, 0x87, 0x48, 0x87, 0x1b, 0x81,
0x45, 0x92, 0x47, 0xc1, 0x7c, 0xf8, 0x15, 0xc8, 0x09, 0xc3, 0x38, 0x9a, 0xe7, 0xf0, 0xd9, 0xc2,
0xd2, 0x8b, 0xe6, 0x14, 0x7f, 0x07, 0x4e, 0x9a, 0x3c, 0x86, 0x23, 0x51, 0x5e, 0x9e, 0xa1, 0xd3,
0x3c, 0xdc, 0x5a, 0x4d, 0xde, 0x1c, 0x81, 0x34, 0x17, 0x33, 0xf7, 0x57, 0x0d, 0xcc, 0x77, 0xb2,
0xc2, 0x29, 0x18, 0x02, 0x65, 0x49, 0x9c, 0x47, 0x9b, 0x6f, 0x65, 0x84, 0xc0, 0x59, 0xc4, 0xf0,
0x5f, 0x8d, 0x09, 0x9b, 0xe6, 0xdb, 0x12, 0x32, 0xe7, 0xbf, 0xf1, 0x58, 0xf2, 0x9f, 0x4d, 0xb8,
0x88, 0xbf, 0x02, 0x9b, 0x37, 0x25, 0xe8, 0x5a, 0xed, 0xe1, 0xe0, 0x49, 0x4b, 0x22, 0x31, 0xb1,
0x52, 0x25, 0xb9, 0x3f, 0xc3, 0xee, 0x3b, 0xf9, 0x0d, 0x3c, 0xe7, 0xbb, 0x3b, 0xdd, 0x3a, 0xd6,
0x4f, 0xef, 0xe7, 0x7b, 0x78, 0x59, 0x64, 0x56, 0xdf, 0x55, 0x0d, 0xca, 0xb2, 0x37, 0x79, 0xc9,
0xf8, 0xe3, 0x91, 0x89, 0x0c, 0x38, 0xfd, 0x5d, 0x03, 0x58, 0x83, 0x80, 0x1d, 0xa8, 0x5c, 0xf7,
0x2e, 0x7a, 0xfd, 0xf7, 0x3d, 0xf4, 0x02, 0x5b, 0x60, 0x9c, 0x07, 0xdd, 0x0e, 0xd2, 0xb0, 0x0d,
0x65, 0x49, 0xe5, 0x25, 0xce, 0xc3, 0x8a, 0xc7, 0x75, 0x4e, 0xf2, 0x05, 0x89, 0x1b, 0xb8, 0x02,
0x7a, 0x41, 0xd5, 0x8a, 0x9b, 0x4d, 0x9e, 0x90, 0x78, 0x57, 0xfe, 0x59, 0xdb, 0x43, 0x15, 0xee,
0x28, 0x58, 0x1a, 0xc0, 0xcc, 0x29, 0x9a, 0xbf, 0xe4, 0xc4, 0x0e, 0xbc, 0x4e, 0x3f, 0xf8, 0xd1,
0x23, 0xc8, 0xe1, 0x36, 0xd2, 0x7f, 0x8f, 0x76, 0x5a, 0x5f, 0x7c, 0x78, 0xfd, 0x30, 0x65, 0x34,
0xcb, 0xea, 0xd3, 0xa4, 0x21, 0xa5, 0xc6, 0x24, 0x69, 0x3c, 0xb0, 0x86, 0xf8, 0x2b, 0xd0, 0x58,
0x8f, 0x36, 0x34, 0x85, 0xe5, 0xeb, 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0xbd, 0x1a, 0xf9, 0xf4,
0x59, 0x08, 0x00, 0x00,
var fileDescriptor_binlogdata_e1edbb575eea20d0 = []byte{
// 1017 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdb, 0x6e, 0xdb, 0x46,
0x13, 0x0e, 0x45, 0x8a, 0x22, 0x87, 0x8e, 0xbd, 0x5e, 0x1f, 0x20, 0x18, 0x08, 0x60, 0x10, 0x3f,
0xfe, 0xb8, 0x06, 0x2a, 0xa5, 0xea, 0xe9, 0xa2, 0x57, 0x96, 0x44, 0xbb, 0x8a, 0x69, 0xc9, 0x59,
0xd3, 0x49, 0x91, 0x1b, 0x82, 0x96, 0xd6, 0xb2, 0x60, 0x89, 0x94, 0xc9, 0x95, 0x5d, 0x3d, 0x47,
0x9f, 0xa2, 0x7d, 0x90, 0xbc, 0x49, 0xef, 0xfa, 0x10, 0xc5, 0x1e, 0x48, 0x49, 0x0e, 0x90, 0xaa,
0x17, 0xbd, 0x9b, 0xd3, 0x7e, 0x3b, 0xf3, 0xcd, 0x70, 0x87, 0x80, 0x6e, 0x46, 0xf1, 0x38, 0x19,
0x0e, 0x22, 0x16, 0xd5, 0xa6, 0x69, 0xc2, 0x12, 0x0c, 0x0b, 0xcb, 0x81, 0xf3, 0x30, 0xa3, 0xe9,
0x5c, 0x3a, 0x0e, 0x36, 0x59, 0x32, 0x4d, 0x16, 0x81, 0xee, 0x05, 0x54, 0x5a, 0x77, 0x51, 0x9a,
0x51, 0x86, 0xf7, 0xc1, 0xec, 0x8f, 0x47, 0x34, 0x66, 0x55, 0xed, 0x50, 0x3b, 0x2a, 0x13, 0xa5,
0x61, 0x0c, 0x46, 0x3f, 0x89, 0xe3, 0x6a, 0x49, 0x58, 0x85, 0xcc, 0x63, 0x33, 0x9a, 0x3e, 0xd2,
0xb4, 0xaa, 0xcb, 0x58, 0xa9, 0xb9, 0x7f, 0xea, 0xb0, 0xdd, 0x14, 0x57, 0x07, 0x69, 0x14, 0x67,
0x51, 0x9f, 0x8d, 0x92, 0x18, 0x9f, 0x01, 0x64, 0x2c, 0x62, 0x74, 0x42, 0x63, 0x96, 0x55, 0xb5,
0x43, 0xfd, 0xc8, 0x69, 0xbc, 0xae, 0x2d, 0x25, 0xfd, 0xd9, 0x91, 0xda, 0x55, 0x1e, 0x4f, 0x96,
0x8e, 0xe2, 0x06, 0x38, 0xf4, 0x91, 0xc6, 0x2c, 0x64, 0xc9, 0x3d, 0x8d, 0xab, 0xc6, 0xa1, 0x76,
0xe4, 0x34, 0xb6, 0x6b, 0xb2, 0x40, 0x8f, 0x7b, 0x02, 0xee, 0x20, 0x40, 0x0b, 0xf9, 0xe0, 0x53,
0x09, 0xec, 0x02, 0x0d, 0xfb, 0x60, 0xf5, 0x23, 0x46, 0x87, 0x49, 0x3a, 0x17, 0x65, 0x6e, 0x36,
0xde, 0xac, 0x99, 0x48, 0xad, 0xa5, 0xce, 0x91, 0x02, 0x01, 0x7f, 0x0d, 0x95, 0xbe, 0x64, 0x4f,
0xb0, 0xe3, 0x34, 0x76, 0x96, 0xc1, 0x14, 0xb1, 0x24, 0x8f, 0xc1, 0x08, 0xf4, 0xec, 0x61, 0x2c,
0x28, 0xdb, 0x20, 0x5c, 0x74, 0x7f, 0xd7, 0xc0, 0xca, 0x71, 0xf1, 0x0e, 0x6c, 0x35, 0xfd, 0xf0,
0xba, 0x4b, 0xbc, 0x56, 0xef, 0xac, 0xdb, 0xf9, 0xe8, 0xb5, 0xd1, 0x0b, 0xbc, 0x01, 0x56, 0xd3,
0x0f, 0x9b, 0xde, 0x59, 0xa7, 0x8b, 0x34, 0xfc, 0x12, 0xec, 0xa6, 0x1f, 0xb6, 0x7a, 0x17, 0x17,
0x9d, 0x00, 0x95, 0xf0, 0x16, 0x38, 0x4d, 0x3f, 0x24, 0x3d, 0xdf, 0x6f, 0x9e, 0xb4, 0xce, 0x91,
0x8e, 0xf7, 0x60, 0xbb, 0xe9, 0x87, 0xed, 0x0b, 0x3f, 0x6c, 0x7b, 0x97, 0xc4, 0x6b, 0x9d, 0x04,
0x5e, 0x1b, 0x19, 0x18, 0xc0, 0xe4, 0xe6, 0xb6, 0x8f, 0xca, 0x4a, 0xbe, 0xf2, 0x02, 0x64, 0x2a,
0xb8, 0x4e, 0xf7, 0xca, 0x23, 0x01, 0xaa, 0x28, 0xf5, 0xfa, 0xb2, 0x7d, 0x12, 0x78, 0xc8, 0x52,
0x6a, 0xdb, 0xf3, 0xbd, 0xc0, 0x43, 0xf6, 0x5b, 0xc3, 0x2a, 0x21, 0xfd, 0xad, 0x61, 0xe9, 0xc8,
0x70, 0x7f, 0xd3, 0x60, 0xef, 0x8a, 0xa5, 0x34, 0x9a, 0x9c, 0xd3, 0x39, 0x89, 0xe2, 0x21, 0x25,
0xf4, 0x61, 0x46, 0x33, 0x86, 0x0f, 0xc0, 0x9a, 0x26, 0xd9, 0x88, 0x73, 0x27, 0x08, 0xb6, 0x49,
0xa1, 0xe3, 0x3a, 0xd8, 0xf7, 0x74, 0x1e, 0xa6, 0x3c, 0x5e, 0x11, 0x86, 0x6b, 0xc5, 0x40, 0x16,
0x48, 0xd6, 0xbd, 0x92, 0x96, 0xf9, 0xd5, 0xff, 0x99, 0x5f, 0xf7, 0x16, 0xf6, 0x9f, 0x27, 0x95,
0x4d, 0x93, 0x38, 0xa3, 0xd8, 0x07, 0x2c, 0x0f, 0x86, 0x6c, 0xd1, 0x5b, 0x91, 0x9f, 0xd3, 0x78,
0xf5, 0xc5, 0x01, 0x20, 0xdb, 0x37, 0xcf, 0x4d, 0xee, 0xaf, 0xb0, 0x23, 0xef, 0x09, 0xa2, 0x9b,
0x31, 0xcd, 0xd6, 0x29, 0x7d, 0x1f, 0x4c, 0x26, 0x82, 0xab, 0xa5, 0x43, 0xfd, 0xc8, 0x26, 0x4a,
0xfb, 0xb7, 0x15, 0x0e, 0x60, 0x77, 0xf5, 0xe6, 0xff, 0xa4, 0xbe, 0xef, 0xc0, 0x20, 0xb3, 0x31,
0xc5, 0xbb, 0x50, 0x9e, 0x44, 0xac, 0x7f, 0xa7, 0xaa, 0x91, 0x0a, 0x2f, 0xe5, 0x76, 0x34, 0x66,
0x34, 0x15, 0x2d, 0xb4, 0x89, 0xd2, 0xdc, 0x37, 0x60, 0x9e, 0x0a, 0x09, 0xff, 0x1f, 0xca, 0xe9,
0x8c, 0xd7, 0x2a, 0x3f, 0x75, 0xb4, 0x9c, 0x00, 0x07, 0x26, 0xd2, 0xed, 0xfe, 0xa5, 0xc1, 0x86,
0x4c, 0xe8, 0x2a, 0x99, 0xa5, 0x7d, 0xca, 0x19, 0xbc, 0xa7, 0xf3, 0x6c, 0x1a, 0xf5, 0x69, 0xce,
0x60, 0xae, 0xf3, 0x64, 0xb2, 0xbb, 0x28, 0x1d, 0xa8, 0x5b, 0xa5, 0x82, 0xbf, 0x07, 0x47, 0x30,
0xc9, 0x42, 0x36, 0x9f, 0x52, 0xc1, 0xe1, 0x66, 0x63, 0x77, 0x31, 0x54, 0x82, 0x27, 0x16, 0xcc,
0xa7, 0x94, 0x00, 0x2b, 0xe4, 0xd5, 0x49, 0x34, 0xd6, 0x98, 0xc4, 0x45, 0xff, 0xca, 0x2b, 0xfd,
0x3b, 0x2e, 0xc8, 0x30, 0x15, 0xca, 0x52, 0xad, 0x92, 0x8e, 0x82, 0xa0, 0x77, 0x60, 0x93, 0xe4,
0xa9, 0x75, 0x27, 0x00, 0x5d, 0x30, 0x6f, 0xe8, 0x6d, 0x92, 0x52, 0xd5, 0x25, 0x50, 0xaf, 0x18,
0x49, 0x9e, 0x88, 0xf2, 0xe0, 0x43, 0x28, 0x47, 0xb7, 0x39, 0xd1, 0xab, 0x21, 0xd2, 0xe1, 0x46,
0x60, 0x91, 0xe4, 0x49, 0xbc, 0x7c, 0xf8, 0x15, 0xc8, 0x0a, 0xc3, 0x38, 0x9a, 0xe4, 0xf4, 0xd9,
0xc2, 0xd2, 0x8d, 0x26, 0x14, 0xff, 0x00, 0x4e, 0x9a, 0x3c, 0x85, 0x7d, 0x71, 0xbd, 0x1c, 0x43,
0xa7, 0xb1, 0xb7, 0xd2, 0x9a, 0x3c, 0x39, 0x02, 0x69, 0x2e, 0x66, 0xee, 0x3b, 0x80, 0xd3, 0x11,
0x1d, 0x0f, 0xd6, 0xba, 0xe4, 0x7f, 0x9c, 0x0e, 0x3a, 0x1e, 0xe4, 0xf8, 0x1b, 0x2a, 0x65, 0x81,
0x40, 0x94, 0xcf, 0xfd, 0xa4, 0x81, 0xf9, 0x5e, 0xe2, 0x1d, 0x83, 0x21, 0x1a, 0x27, 0xdf, 0xe2,
0xfd, 0xe5, 0x74, 0x64, 0x84, 0x68, 0x9d, 0x88, 0xe1, 0x8b, 0x68, 0xc8, 0x46, 0xf9, 0x00, 0x08,
0x99, 0x3f, 0xa9, 0x83, 0x81, 0x7c, 0x52, 0x6d, 0xc2, 0x45, 0xfc, 0x0d, 0xd8, 0xbc, 0x4e, 0xb1,
0x01, 0x54, 0x6b, 0x77, 0x9f, 0x55, 0x29, 0x80, 0x89, 0x95, 0xe6, 0xcc, 0xfd, 0x08, 0x8e, 0xc8,
0x4c, 0x1d, 0x2a, 0x8b, 0x43, 0xfb, 0xab, 0x9d, 0xcc, 0x19, 0x20, 0x70, 0x5b, 0xc8, 0xee, 0x2f,
0xb0, 0xf9, 0x5e, 0x7e, 0x8f, 0xeb, 0xbc, 0x01, 0xc7, 0x2b, 0x1f, 0xce, 0x97, 0x67, 0xe5, 0x27,
0xd8, 0x2a, 0x90, 0xd5, 0x37, 0x7e, 0x04, 0x65, 0x99, 0x9f, 0xfc, 0xaa, 0xf0, 0xe7, 0x5c, 0x11,
0x19, 0x70, 0xfc, 0x87, 0x06, 0xb0, 0x60, 0x0f, 0x3b, 0x50, 0xb9, 0xee, 0x9e, 0x77, 0x7b, 0x1f,
0xba, 0xe8, 0x05, 0xb6, 0xc0, 0x38, 0x0b, 0x3a, 0x6d, 0xa4, 0x61, 0x1b, 0xca, 0x72, 0xad, 0x94,
0xf8, 0x4e, 0x50, 0x3b, 0x45, 0xe7, 0x0b, 0xa7, 0x58, 0x28, 0x06, 0xae, 0x80, 0x5e, 0xac, 0x0d,
0xb5, 0x27, 0x4c, 0x0e, 0x48, 0xbc, 0x4b, 0xff, 0xa4, 0xe5, 0xa1, 0x0a, 0x77, 0x14, 0x1b, 0x03,
0xc0, 0xcc, 0xd7, 0x05, 0x3f, 0xc9, 0x97, 0x0c, 0xf0, 0x7b, 0x7a, 0xc1, 0xcf, 0x1e, 0x41, 0x0e,
0xb7, 0x91, 0xde, 0x07, 0xb4, 0xc1, 0x6d, 0xa7, 0x1d, 0xcf, 0x6f, 0xa3, 0x97, 0xcd, 0xaf, 0x3e,
0xbe, 0x7e, 0x1c, 0x31, 0x9a, 0x65, 0xb5, 0x51, 0x52, 0x97, 0x52, 0x7d, 0x98, 0xd4, 0x1f, 0x59,
0x5d, 0xfc, 0xa1, 0xd4, 0x17, 0x55, 0xde, 0x98, 0xc2, 0xf2, 0xed, 0xdf, 0x01, 0x00, 0x00, 0xff,
0xff, 0xda, 0xbf, 0xad, 0x34, 0xf0, 0x08, 0x00, 0x00,
}

Просмотреть файл

@ -46,6 +46,10 @@ var shardedVSchema = `{
}`
func TestUpdateVSchema(t *testing.T) {
if testing.Short() {
t.Skip()
}
defer setVSchema("{}")
// We have to start at least one stream to start the vschema watcher.

Просмотреть файл

@ -61,7 +61,10 @@ func (checker) CheckMySQL() {}
func TestMain(m *testing.M) {
flag.Parse() // Do not remove this comment, import into google3 depends on it
tabletenv.Init()
if testing.Short() {
os.Exit(m.Run())
}
exitCode := func() int {
// Launch MySQL.
@ -71,7 +74,7 @@ func TestMain(m *testing.M) {
Topology: &vttestpb.VTTestTopology{
Keyspaces: []*vttestpb.Keyspace{
{
Name: "vttest",
Name: keyspaceName,
Shards: []*vttestpb.Shard{
{
Name: "0",

Просмотреть файл

@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
@ -46,6 +47,7 @@ type Plan struct {
type ColExpr struct {
ColNum int
Alias sqlparser.ColIdent
Type querypb.Type
Operation Operation
}
@ -55,7 +57,7 @@ type Operation int
// The following are the supported operations on a column.
const (
OpNone = Operation(iota)
OpYearMonth
OpMonth
OpDay
OpHour
)
@ -71,19 +73,19 @@ func (plan *Plan) filter(values []sqltypes.Value) (bool, []sqltypes.Value, error
result := make([]sqltypes.Value, len(plan.ColExprs))
for i, colExpr := range plan.ColExprs {
switch colExpr.Operation {
case OpYearMonth:
case OpMonth:
v, _ := sqltypes.ToInt64(values[colExpr.ColNum])
t := time.Unix(v, 0)
t := time.Unix(v, 0).UTC()
s := fmt.Sprintf("%d%02d", t.Year(), t.Month())
result[i] = sqltypes.NewVarBinary(s)
case OpDay:
v, _ := sqltypes.ToInt64(values[colExpr.ColNum])
t := time.Unix(v, 0)
t := time.Unix(v, 0).UTC()
s := fmt.Sprintf("%d%02d%02d", t.Year(), t.Month(), t.Day())
result[i] = sqltypes.NewVarBinary(s)
case OpHour:
v, _ := sqltypes.ToInt64(values[colExpr.ColNum])
t := time.Unix(v, 0)
t := time.Unix(v, 0).UTC()
s := fmt.Sprintf("%d%02d%02d%02d", t.Year(), t.Month(), t.Day(), t.Hour())
result[i] = sqltypes.NewVarBinary(s)
default:
@ -140,6 +142,7 @@ func buildREPlan(ti *Table, kschema *vindexes.KeyspaceSchema, filter string) (*P
for i, col := range ti.Columns {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = col.Name
plan.ColExprs[i].Type = col.Type
}
if filter == "" {
return plan, nil
@ -187,7 +190,7 @@ func buildTablePlan(ti *Table, kschema *vindexes.KeyspaceSchema, query string) (
}
sel, ok := statement.(*sqlparser.Select)
if !ok {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel))
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(statement))
}
if len(sel.From) > 1 {
return nil, fmt.Errorf("unexpected: %v", sqlparser.String(sel))
@ -220,6 +223,7 @@ func buildTablePlan(ti *Table, kschema *vindexes.KeyspaceSchema, query string) (
for i, col := range ti.Columns {
plan.ColExprs[i].ColNum = i
plan.ColExprs[i].Alias = col.Name
plan.ColExprs[i].Type = col.Type
}
}
@ -265,6 +269,9 @@ func buildTablePlan(ti *Table, kschema *vindexes.KeyspaceSchema, query string) (
if err != nil {
return nil, err
}
if !plan.Vindex.IsUnique() || !plan.Vindex.IsFunctional() {
return nil, fmt.Errorf("vindex must be Unique and Functional to be used for VReplication: %s", vtype)
}
kr, err := selString(funcExpr.Exprs[2])
if err != nil {
return nil, err
@ -291,13 +298,13 @@ func analyzeExpr(ti *Table, expr sqlparser.SelectExpr) (cExpr ColExpr, err error
if err != nil {
return ColExpr{}, err
}
return ColExpr{ColNum: colnum, Alias: expr.Name}, nil
return ColExpr{ColNum: colnum, Alias: expr.Name, Type: ti.Columns[colnum].Type}, nil
case *sqlparser.FuncExpr:
if expr.Distinct || len(expr.Exprs) != 1 {
return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(expr))
}
switch fname := expr.Name.Lowered(); fname {
case "yearmonth", "day", "hour":
case "month", "day", "hour":
aInner, ok := expr.Exprs[0].(*sqlparser.AliasedExpr)
if !ok {
return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(expr))
@ -306,20 +313,21 @@ func analyzeExpr(ti *Table, expr sqlparser.SelectExpr) (cExpr ColExpr, err error
if !ok {
return ColExpr{}, fmt.Errorf("unsupported: %v", sqlparser.String(expr))
}
if aInner.As.IsEmpty() {
return ColExpr{}, fmt.Errorf("need alias for expression: %v", sqlparser.String(expr))
as := aexpr.As
if as.IsEmpty() {
as = sqlparser.NewColIdent(sqlparser.String(expr))
}
colnum, err := findColumn(ti, innerCol.Name)
if err != nil {
return ColExpr{}, err
}
switch fname {
case "yearmonth":
return ColExpr{ColNum: colnum, Alias: aInner.As, Operation: OpYearMonth}, nil
case "month":
return ColExpr{ColNum: colnum, Alias: as, Type: sqltypes.VarBinary, Operation: OpMonth}, nil
case "day":
return ColExpr{ColNum: colnum, Alias: aInner.As, Operation: OpDay}, nil
return ColExpr{ColNum: colnum, Alias: as, Type: sqltypes.VarBinary, Operation: OpDay}, nil
case "hour":
return ColExpr{ColNum: colnum, Alias: aInner.As, Operation: OpHour}, nil
return ColExpr{ColNum: colnum, Alias: as, Type: sqltypes.VarBinary, Operation: OpHour}, nil
default:
panic("unreachable")
}

Просмотреть файл

@ -0,0 +1,383 @@
/*
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vstreamer
import (
"fmt"
"reflect"
"testing"
"vitess.io/vitess/go/json2"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)
var testKSChema *vindexes.KeyspaceSchema
func init() {
input := `{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
},
"lookup": {
"type": "lookup"
}
},
"tables": {
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
}
}
}`
var kspb vschemapb.Keyspace
if err := json2.Unmarshal([]byte(input), &kspb); err != nil {
panic(fmt.Errorf("Unmarshal failed: %v", err))
}
kschema, err := vindexes.BuildKeyspaceSchema(&kspb, keyspaceName)
if err != nil {
panic(err)
}
testKSChema = kschema
}
func TestPlanbuilder(t *testing.T) {
t1 := &Table{
TableMap: &mysql.TableMap{
Name: "t1",
},
Columns: []schema.TableColumn{{
Name: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
Name: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
}
// t1alt has no id column
t1alt := &Table{
TableMap: &mysql.TableMap{
Name: "t1",
},
Columns: []schema.TableColumn{{
Name: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
}
t2 := &Table{
TableMap: &mysql.TableMap{
Name: "t2",
},
Columns: []schema.TableColumn{{
Name: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
Name: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
}
testcases := []struct {
inTable *Table
inRule *binlogdatapb.Rule
outPlan *Plan
outErr string
}{{
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/.*/"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/.*/", Filter: "-80"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
VindexColumn: 0,
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select * from t1"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}},
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}, {
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}},
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select val, id from t1 where in_keyrange(id, 'hash', '-80')"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}, {
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}},
VindexColumn: 1,
},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, month(val) m, day(id), hour(val) from t1 where in_keyrange(m, 'hash', '-80')"},
outPlan: &Plan{
ColExprs: []ColExpr{{
ColNum: 0,
Alias: sqlparser.NewColIdent("id"),
Type: sqltypes.Int64,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("val"),
Type: sqltypes.VarBinary,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("m"),
Type: sqltypes.VarBinary,
Operation: OpMonth,
}, {
ColNum: 0,
Alias: sqlparser.NewColIdent("day(id)"),
Type: sqltypes.VarBinary,
Operation: OpDay,
}, {
ColNum: 1,
Alias: sqlparser.NewColIdent("hour(val)"),
Type: sqltypes.VarBinary,
Operation: OpHour,
}},
VindexColumn: 2,
},
}, {
inTable: t2,
inRule: &binlogdatapb.Rule{Match: "/t1/"},
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/*/"},
outErr: "error parsing regexp: missing argument to repetition operator: `*`",
}, {
inTable: t2,
inRule: &binlogdatapb.Rule{Match: "/.*/", Filter: "-80"},
outErr: `no vschema definition for table t2`,
}, {
inTable: t1alt,
inRule: &binlogdatapb.Rule{Match: "/.*/", Filter: "-80"},
outErr: `column id not found in table t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/.*/", Filter: "80"},
outErr: `malformed spec: doesn't define a range: "80"`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "/.*/", Filter: "-80-"},
outErr: `error parsing keyrange: -80-`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "bad query"},
outErr: `syntax error at position 4 near 'bad'`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "delete from t1"},
outErr: `unexpected: delete from t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select * from t1, t2"},
outErr: `unexpected: select * from t1, t2`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select * from t1 join t2"},
outErr: `unexpected: select * from t1 join t2`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select * from a.t1"},
outErr: `unexpected: select * from a.t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select * from t2"},
outErr: `unexpected: select expression table t2 does not match the table entry name t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select *, id from t1"},
outErr: `unexpected: select *, id from t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where id=1"},
outErr: `unexpected where clause: where id = 1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where max(id)"},
outErr: `unexpected where clause: where max(id)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id)"},
outErr: `unexpected where clause: where in_keyrange(id)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(*, 'hash', '-80')"},
outErr: `unexpected: in_keyrange(*, 'hash', '-80')`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(1, 'hash', '-80')"},
outErr: `unsupported: in_keyrange(1, 'hash', '-80')`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(none, 'hash', '-80')"},
outErr: `keyrange expression does not reference a column in the select list: none`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id, 'lookup', '-80')"},
outErr: `vindex must be Unique and Functional to be used for VReplication: lookup`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id, 'hash', '80')"},
outErr: `malformed spec: doesn't define a range: "80"`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id, 'hash', '-80-')"},
outErr: `unexpected where clause: where in_keyrange(id, 'hash', '-80-')`,
}, {
// analyzeExpr tests.
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, * from t1"},
outErr: `unexpected: *`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select none from t1"},
outErr: `column none not found in table t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, hour(distinct a) from t1"},
outErr: `unsupported: hour(distinct a)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, hour(a, b) from t1"},
outErr: `unsupported: hour(a, b)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, hour(*) from t1"},
outErr: `unsupported: hour(*)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, hour(val+1) from t1"},
outErr: `unsupported: hour(val + 1)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, hour(none) from t1"},
outErr: `column none not found in table t1`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val, max(val) from t1"},
outErr: `unsupported: max(val)`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id+1, val from t1"},
outErr: `unexpected: id + 1`,
}, {
// selString
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id, *, '-80')"},
outErr: `unexpected: *`,
}, {
inTable: t1,
inRule: &binlogdatapb.Rule{Match: "t1", Filter: "select id, val from t1 where in_keyrange(id, 1+1, '-80')"},
outErr: `unexpected: 1 + 1`,
}}
for _, tcase := range testcases {
plan, err := buildPlan(tcase.inTable, testKSChema, &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{tcase.inRule},
})
if plan != nil {
plan.Table = nil
plan.Vindex = nil
plan.KeyRange = nil
if !reflect.DeepEqual(tcase.outPlan, plan) {
t.Errorf("Plan(%v, %v):\n%v, want\n%v", tcase.inTable, tcase.inRule, plan, tcase.outPlan)
}
}
gotErr := ""
if err != nil {
gotErr = err.Error()
}
if gotErr != tcase.outErr {
t.Errorf("Plan(%v, %v) err: %v, want %v", tcase.inTable, tcase.inRule, err, tcase.outErr)
}
}
}

Просмотреть файл

@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
)
var packetSize = flag.Int("vstream_packet_size", 10000, "Suggested packet size for VReplication streamer. This is used only as a recommendation. The actual packet size may be more or less than this amount.")
@ -120,8 +121,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
// If a single row exceeds the packet size, it will be in its own packet.
bufferAndTransmit := func(vevent *binlogdatapb.VEvent) error {
switch vevent.Type {
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN:
// We never have to send GTID or BEGIN events on their own.
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
// We never have to send GTID, BEGIN or FIELD events on their own.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_ROLLBACK, binlogdatapb.VEventType_DDL:
// COMMIT, ROLLBACK and DDL are terminal. There may be no more events after
@ -151,6 +152,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
curSize += newSize
bufferedEvents = append(bufferedEvents, vevent)
default:
return fmt.Errorf("unexpected event: %v", vevent)
}
return nil
}
@ -306,6 +309,23 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
return nil, err
}
vs.plans[id] = plan
if plan == nil {
return nil, nil
}
fields := make([]*querypb.Field, len(plan.ColExprs))
for i, ce := range plan.ColExprs {
fields[i] = &querypb.Field{
Name: ce.Alias.String(),
Type: ce.Type,
}
}
vevents = append(vevents, &binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_FIELD,
FieldEvent: &binlogdatapb.FieldEvent{
TableName: plan.Table.Name,
Fields: fields,
},
})
case ev.IsWriteRows() || ev.IsDeleteRows() || ev.IsUpdateRows():
// The existence of before and after images can be used to
// identify statememt types. It's also possible that the

Просмотреть файл

@ -34,6 +34,10 @@ type testcase struct {
}
func TestStatements(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table stream1(id int, val varbinary(128), primary key(id))",
"create table stream2(id int, val varbinary(128), primary key(id))",
@ -56,6 +60,7 @@ func TestStatements(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"stream1" fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"stream1" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`type:ROW row_event:<table_name:"stream1" row_changes:<before:<lengths:1 lengths:3 values:"1aaa" > after:<lengths:1 lengths:3 values:"1bbb" > > > `,
`commit`,
@ -87,7 +92,9 @@ func TestStatements(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"stream1" fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"stream1" row_changes:<after:<lengths:1 lengths:3 values:"2bbb" > > > `,
`type:FIELD field_event:<table_name:"stream2" fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"stream2" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`type:ROW row_event:<table_name:"stream1" ` +
`row_changes:<before:<lengths:1 lengths:3 values:"1bbb" > after:<lengths:1 lengths:3 values:"1ccc" > > ` +
@ -127,6 +134,10 @@ func TestStatements(t *testing.T) {
}
func TestRegexp(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table yes_stream(id int, val varbinary(128), primary key(id))",
"create table no_stream(id int, val varbinary(128), primary key(id))",
@ -155,6 +166,7 @@ func TestRegexp(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"yes_stream" fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"yes_stream" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`type:ROW row_event:<table_name:"yes_stream" row_changes:<before:<lengths:1 lengths:3 values:"1aaa" > after:<lengths:1 lengths:3 values:"1bbb" > > > `,
`commit`,
@ -164,6 +176,10 @@ func TestRegexp(t *testing.T) {
}
func TestREKeyrange(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))",
})
@ -206,6 +222,7 @@ func TestREKeyrange(t *testing.T) {
expectLog(ctx, t, input, ch, [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"t1" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<after:<lengths:1 lengths:1 lengths:3 values:"14aaa" > > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<before:<lengths:1 lengths:1 lengths:3 values:"14aaa" > after:<lengths:1 lengths:1 lengths:3 values:"24aaa" > > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<before:<lengths:1 lengths:1 lengths:3 values:"24aaa" > > > `,
@ -253,6 +270,10 @@ func TestREKeyrange(t *testing.T) {
}
func TestSelectFilter(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))",
})
@ -280,6 +301,7 @@ func TestSelectFilter(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"t1" fields:<name:"id2" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`commit`,
}},
@ -287,9 +309,69 @@ func TestSelectFilter(t *testing.T) {
runCases(t, filter, testcases)
}
func TestSelectExpressions(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table expr_test(id int, val bigint, primary key(id))",
})
defer execStatements(t, []string{
"drop table expr_test",
})
engine.se.Reload(context.Background())
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "expr_test",
Filter: "select id, val, month(val), day(val), hour(val) from expr_test",
}},
}
testcases := []testcase{{
input: []string{
"begin",
"insert into expr_test values (1, 1546392881)",
"commit",
},
// MySQL issues GTID->BEGIN.
// MariaDB issues BEGIN->GTID.
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"expr_test" ` +
`fields:<name:"id" type:INT32 > ` +
`fields:<name:"val" type:INT64 > ` +
`fields:<name:"month(val)" type:VARBINARY > ` +
`fields:<name:"day(val)" type:VARBINARY > ` +
`fields:<name:"hour(val)" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"expr_test" row_changes:<after:<lengths:1 lengths:10 lengths:6 lengths:8 lengths:10 values:"` +
`1` +
`1546392881` +
`201901` +
`20190102` +
`2019010201` +
`" > > > `,
`commit`,
}},
}}
runCases(t, filter, testcases)
}
func TestDDLAddColumn(t *testing.T) {
execStatement(t, "create table ddl_test1(id int, val1 varbinary(128), primary key(id))")
defer execStatement(t, "drop table ddl_test1")
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table ddl_test1(id int, val1 varbinary(128), primary key(id))",
"create table ddl_test2(id int, val1 varbinary(128), primary key(id))",
})
defer execStatements(t, []string{
"drop table ddl_test1",
"drop table ddl_test2",
})
// Record position before the next few statements.
pos, err := mysqld.MasterPosition()
@ -297,20 +379,37 @@ func TestDDLAddColumn(t *testing.T) {
t.Fatal(err)
}
execStatements(t, []string{
"begin",
"insert into ddl_test1 values(1, 'aaa')",
"insert into ddl_test2 values(1, 'aaa')",
"commit",
// Adding columns is allowed.
"alter table ddl_test1 add column val2 varbinary(128)",
"alter table ddl_test2 add column val2 varbinary(128)",
"begin",
"insert into ddl_test1 values(2, 'bbb', 'ccc')",
"insert into ddl_test2 values(2, 'bbb', 'ccc')",
"commit",
})
engine.se.Reload(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Test RE as well as select-based filters.
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "ddl_test2",
Filter: "select * from ddl_test2",
}, {
Match: "/.*/",
}},
}
ch := make(chan []*binlogdatapb.VEvent)
go func() {
defer close(ch)
if err := vstream(ctx, t, pos, nil, ch); err != nil {
if err := vstream(ctx, t, pos, filter, ch); err != nil {
t.Fatal(err)
}
}()
@ -318,22 +417,35 @@ func TestDDLAddColumn(t *testing.T) {
// Current schema has 3 columns, but they'll be truncated to match the two columns in the event.
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"ddl_test1" fields:<name:"id" type:INT32 > fields:<name:"val1" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"ddl_test1" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`type:FIELD field_event:<table_name:"ddl_test2" fields:<name:"id" type:INT32 > fields:<name:"val1" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"ddl_test2" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`commit`,
}, {
`gtid`,
`type:DDL ddl:"alter table ddl_test1 add column val2 varbinary(128)" `,
}, {
`gtid`,
`type:DDL ddl:"alter table ddl_test2 add column val2 varbinary(128)" `,
}, {
// The plan will be updated to now include the third column
// because the new table map will have three columns.
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"ddl_test1" fields:<name:"id" type:INT32 > fields:<name:"val1" type:VARBINARY > fields:<name:"val2" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"ddl_test1" row_changes:<after:<lengths:1 lengths:3 lengths:3 values:"2bbbccc" > > > `,
`type:FIELD field_event:<table_name:"ddl_test2" fields:<name:"id" type:INT32 > fields:<name:"val1" type:VARBINARY > fields:<name:"val2" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"ddl_test2" row_changes:<after:<lengths:1 lengths:3 lengths:3 values:"2bbbccc" > > > `,
`commit`,
}})
}
func TestDDLDropColumn(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatement(t, "create table ddl_test2(id int, val1 varbinary(128), val2 varbinary(128), primary key(id))")
defer execStatement(t, "drop table ddl_test2")
@ -360,13 +472,17 @@ func TestDDLDropColumn(t *testing.T) {
}()
defer close(ch)
err = vstream(ctx, t, pos, nil, ch)
want := "Column count doesn't match value"
if err == nil || strings.Contains(err.Error(), want) {
want := "cannot determine table columns"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain %s", err, want)
}
}
func TestBuffering(t *testing.T) {
if testing.Short() {
t.Skip()
}
savedSize := *packetSize
*packetSize = 10
defer func() { *packetSize = savedSize }()
@ -386,6 +502,7 @@ func TestBuffering(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"packet_test" fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"packet_test" row_changes:<after:<lengths:1 lengths:3 values:"1123" > > > `,
`type:ROW row_event:<table_name:"packet_test" row_changes:<after:<lengths:1 lengths:3 values:"2456" > > > `,
`commit`,
@ -462,18 +579,24 @@ func TestBuffering(t *testing.T) {
}
func TestTypes(t *testing.T) {
if testing.Short() {
t.Skip()
}
// Modeled after vttablet endtoend compatibility tests.
execStatements(t, []string{
"create table vitess_ints(tiny tinyint, tinyu tinyint unsigned, small smallint, smallu smallint unsigned, medium mediumint, mediumu mediumint unsigned, normal int, normalu int unsigned, big bigint, bigu bigint unsigned, y year, primary key(tiny))",
"create table vitess_fracts(id int, deci decimal(5,2), num numeric(5,2), f float, d double, primary key(id))",
"create table vitess_strings(vb varbinary(16), c char(16), vc varchar(16), b binary(4), tb tinyblob, bl blob, ttx tinytext, tx text, en enum('a','b'), s set('a','b'), primary key(vb))",
"create table vitess_misc(id int, b bit(8), d date, dt datetime, t time, g geometry, primary key(id))",
"create table vitess_null(id int, val varbinary(128), primary key(id))",
})
defer execStatements(t, []string{
"drop table vitess_ints",
"drop table vitess_fracts",
"drop table vitess_strings",
"drop table vitess_misc",
"drop table vitess_null",
})
engine.se.Reload(context.Background())
@ -484,6 +607,18 @@ func TestTypes(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"vitess_ints" ` +
`fields:<name:"tiny" type:INT8 > ` +
`fields:<name:"tinyu" type:UINT8 > ` +
`fields:<name:"small" type:INT16 > ` +
`fields:<name:"smallu" type:UINT16 > ` +
`fields:<name:"medium" type:INT24 > ` +
`fields:<name:"mediumu" type:UINT24 > ` +
`fields:<name:"normal" type:INT32 > ` +
`fields:<name:"normalu" type:UINT32 > ` +
`fields:<name:"big" type:INT64 > ` +
`fields:<name:"bigu" type:UINT64 > ` +
`fields:<name:"y" type:YEAR > > `,
`type:ROW row_event:<table_name:"vitess_ints" row_changes:<after:<lengths:4 lengths:3 lengths:6 lengths:5 lengths:8 lengths:8 lengths:11 lengths:10 lengths:20 lengths:20 lengths:4 values:"` +
`-128` +
`255` +
@ -506,6 +641,12 @@ func TestTypes(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"vitess_fracts" ` +
`fields:<name:"id" type:INT32 > ` +
`fields:<name:"deci" type:DECIMAL > ` +
`fields:<name:"num" type:DECIMAL > ` +
`fields:<name:"f" type:FLOAT32 > ` +
`fields:<name:"d" type:FLOAT64 > > `,
`type:ROW row_event:<table_name:"vitess_fracts" row_changes:<after:<lengths:1 lengths:4 lengths:4 lengths:8 lengths:8 values:"` +
`1` +
`1.99` +
@ -523,6 +664,17 @@ func TestTypes(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"vitess_strings" ` +
`fields:<name:"vb" type:VARBINARY > ` +
`fields:<name:"c" type:CHAR > ` +
`fields:<name:"vc" type:VARCHAR > ` +
`fields:<name:"b" type:BINARY > ` +
`fields:<name:"tb" type:BLOB > ` +
`fields:<name:"bl" type:BLOB > ` +
`fields:<name:"ttx" type:TEXT > ` +
`fields:<name:"tx" type:TEXT > ` +
`fields:<name:"en" type:ENUM > ` +
`fields:<name:"s" type:SET > > `,
`type:ROW row_event:<table_name:"vitess_strings" row_changes:<after:<lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 lengths:1 ` +
`values:"abcdefgh13" > > > `,
`commit`,
@ -535,6 +687,13 @@ func TestTypes(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"vitess_misc" ` +
`fields:<name:"id" type:INT32 > ` +
`fields:<name:"b" type:BIT > ` +
`fields:<name:"d" type:DATE > ` +
`fields:<name:"dt" type:DATETIME > ` +
`fields:<name:"t" type:TIME > ` +
`fields:<name:"g" type:GEOMETRY > > `,
`type:ROW row_event:<table_name:"vitess_misc" row_changes:<after:<lengths:1 lengths:1 lengths:10 lengths:19 lengths:8 lengths:25 values:"` +
`1` +
`\001` +
@ -545,11 +704,26 @@ func TestTypes(t *testing.T) {
`" > > > `,
`commit`,
}},
}, {
input: []string{
"insert into vitess_null values(1, null)",
},
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"vitess_null" fields:<name:"id" type:INT32 > fields:<name:"val" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"vitess_null" row_changes:<after:<lengths:1 lengths:-1 values:"1" > > > `,
`commit`,
}},
}}
runCases(t, nil, testcases)
}
func TestJSON(t *testing.T) {
if testing.Short() {
t.Skip()
}
// JSON is supported only after mysql57.
if err := mysqld.ExecuteSuperQuery(context.Background(), "create table vitess_json(id int default 1, val json, primary key(id))"); err != nil {
// If it's a syntax error, MySQL is an older version. Skip this test.
@ -568,6 +742,7 @@ func TestJSON(t *testing.T) {
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`type:FIELD field_event:<table_name:"vitess_json" fields:<name:"id" type:INT32 > fields:<name:"val" type:JSON > > `,
`type:ROW row_event:<table_name:"vitess_json" row_changes:<after:<lengths:1 lengths:24 values:"1JSON_OBJECT('foo','bar')" > > > `,
`commit`,
}},
@ -575,6 +750,120 @@ func TestJSON(t *testing.T) {
runCases(t, nil, testcases)
}
func TestExternalTable(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create database external",
"create table external.ext(id int, val varbinary(128), primary key(id))",
})
defer execStatements(t, []string{
"drop database external",
})
engine.se.Reload(context.Background())
testcases := []testcase{{
input: []string{
"begin",
"insert into external.ext values (1, 'aaa')",
"commit",
},
// External table events don't get sent.
output: [][]string{{
`gtid|begin`,
`gtid|begin`,
`commit`,
}},
}}
runCases(t, nil, testcases)
}
func TestMinimalMode(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table t1(id int, val1 varbinary(128), val2 varbinary(128), primary key(id))",
"insert into t1 values(1, 'aaa', 'bbb')",
})
defer execStatements(t, []string{
"drop table t1",
})
engine.se.Reload(context.Background())
// Record position before the next few statements.
pos, err := mysqld.MasterPosition()
if err != nil {
t.Fatal(err)
}
execStatements(t, []string{
"set @@session.binlog_row_image='minimal'",
"update t1 set val1='bbb' where id=1",
"set @@session.binlog_row_image='full'",
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan []*binlogdatapb.VEvent)
go func() {
for evs := range ch {
t.Errorf("received: %v", evs)
}
}()
defer close(ch)
err = vstream(ctx, t, pos, nil, ch)
want := "partial row image encountered"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain '%s'", err, want)
}
}
func TestStatementMode(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table t1(id int, val1 varbinary(128), val2 varbinary(128), primary key(id))",
"insert into t1 values(1, 'aaa', 'bbb')",
})
defer execStatements(t, []string{
"drop table t1",
})
engine.se.Reload(context.Background())
// Record position before the next few statements.
pos, err := mysqld.MasterPosition()
if err != nil {
t.Fatal(err)
}
execStatements(t, []string{
"set @@session.binlog_format='statement'",
"update t1 set val1='bbb' where id=1",
"set @@session.binlog_format='row'",
})
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan []*binlogdatapb.VEvent)
go func() {
for evs := range ch {
t.Errorf("received: %v", evs)
}
}()
defer close(ch)
err = vstream(ctx, t, pos, nil, ch)
want := "unexpected statement type"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("err: %v, must contain '%s'", err, want)
}
}
func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())

Просмотреть файл

@ -172,6 +172,7 @@ enum VEventType {
SET = 10;
OTHER = 11;
ROW = 12;
FIELD = 13;
}
// RowChange represents one row change
@ -186,12 +187,18 @@ message RowEvent {
repeated RowChange row_changes = 2;
}
message FieldEvent {
string table_name = 1;
repeated query.Field fields = 2;
}
// VEvent represents a vstream event
message VEvent {
VEventType type = 1;
string gtid = 2;
string ddl = 3;
RowEvent row_event = 4;
FieldEvent field_event = 5;
}
// VStreamRequest is the payload for VStream

Просмотреть файл

@ -22,7 +22,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
package='binlogdata',
syntax='proto3',
serialized_options=_b('Z\'vitess.io/vitess/go/vt/proto/binlogdata'),
serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xb5\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\"r\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x0c\n\x04gtid\x18\x02 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x03 \x01(\t\x12\'\n\trow_event\x18\x04 \x01(\x0b\x32\x14.binlogdata.RowEvent\"F\n\x0eVStreamRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\"\n\x06\x66ilter\x18\x02 \x01(\x0b\x32\x12.binlogdata.Filter\"4\n\x0fVStreamResponse\x12!\n\x05\x65vent\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*\x9f\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x42)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3')
serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xb5\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\"\x9f\x01\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x0c\n\x04gtid\x18\x02 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x03 \x01(\t\x12\'\n\trow_event\x18\x04 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x05 \x01(\x0b\x32\x16.binlogdata.FieldEvent\"F\n\x0eVStreamRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\"\n\x06\x66ilter\x18\x02 \x01(\x0b\x32\x12.binlogdata.Filter\"4\n\x0fVStreamResponse\x12!\n\x05\x65vent\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*\xaa\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\rB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3')
,
dependencies=[query__pb2.DESCRIPTOR,topodata__pb2.DESCRIPTOR,])
@ -84,11 +84,15 @@ _VEVENTTYPE = _descriptor.EnumDescriptor(
name='ROW', index=12, number=12,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FIELD', index=13, number=13,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=1594,
serialized_end=1753,
serialized_start=1704,
serialized_end=1874,
)
_sym_db.RegisterEnumDescriptor(_VEVENTTYPE)
@ -106,6 +110,7 @@ DELETE = 9
SET = 10
OTHER = 11
ROW = 12
FIELD = 13
_BINLOGTRANSACTION_STATEMENT_CATEGORY = _descriptor.EnumDescriptor(
@ -654,6 +659,44 @@ _ROWEVENT = _descriptor.Descriptor(
)
_FIELDEVENT = _descriptor.Descriptor(
name='FieldEvent',
full_name='binlogdata.FieldEvent',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='table_name', full_name='binlogdata.FieldEvent.table_name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='fields', full_name='binlogdata.FieldEvent.fields', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
serialized_options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=1351,
serialized_end=1413,
)
_VEVENT = _descriptor.Descriptor(
name='VEvent',
full_name='binlogdata.VEvent',
@ -689,6 +732,13 @@ _VEVENT = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='field_event', full_name='binlogdata.VEvent.field_event', index=4,
number=5, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
@ -701,8 +751,8 @@ _VEVENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1351,
serialized_end=1465,
serialized_start=1416,
serialized_end=1575,
)
@ -739,8 +789,8 @@ _VSTREAMREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1467,
serialized_end=1537,
serialized_start=1577,
serialized_end=1647,
)
@ -770,8 +820,8 @@ _VSTREAMRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1539,
serialized_end=1591,
serialized_start=1649,
serialized_end=1701,
)
_BINLOGTRANSACTION_STATEMENT.fields_by_name['category'].enum_type = _BINLOGTRANSACTION_STATEMENT_CATEGORY
@ -792,8 +842,10 @@ _BINLOGSOURCE.fields_by_name['filter'].message_type = _FILTER
_ROWCHANGE.fields_by_name['before'].message_type = query__pb2._ROW
_ROWCHANGE.fields_by_name['after'].message_type = query__pb2._ROW
_ROWEVENT.fields_by_name['row_changes'].message_type = _ROWCHANGE
_FIELDEVENT.fields_by_name['fields'].message_type = query__pb2._FIELD
_VEVENT.fields_by_name['type'].enum_type = _VEVENTTYPE
_VEVENT.fields_by_name['row_event'].message_type = _ROWEVENT
_VEVENT.fields_by_name['field_event'].message_type = _FIELDEVENT
_VSTREAMREQUEST.fields_by_name['filter'].message_type = _FILTER
_VSTREAMRESPONSE.fields_by_name['event'].message_type = _VEVENT
DESCRIPTOR.message_types_by_name['Charset'] = _CHARSET
@ -807,6 +859,7 @@ DESCRIPTOR.message_types_by_name['Filter'] = _FILTER
DESCRIPTOR.message_types_by_name['BinlogSource'] = _BINLOGSOURCE
DESCRIPTOR.message_types_by_name['RowChange'] = _ROWCHANGE
DESCRIPTOR.message_types_by_name['RowEvent'] = _ROWEVENT
DESCRIPTOR.message_types_by_name['FieldEvent'] = _FIELDEVENT
DESCRIPTOR.message_types_by_name['VEvent'] = _VEVENT
DESCRIPTOR.message_types_by_name['VStreamRequest'] = _VSTREAMREQUEST
DESCRIPTOR.message_types_by_name['VStreamResponse'] = _VSTREAMRESPONSE
@ -898,6 +951,13 @@ RowEvent = _reflection.GeneratedProtocolMessageType('RowEvent', (_message.Messag
))
_sym_db.RegisterMessage(RowEvent)
FieldEvent = _reflection.GeneratedProtocolMessageType('FieldEvent', (_message.Message,), dict(
DESCRIPTOR = _FIELDEVENT,
__module__ = 'binlogdata_pb2'
# @@protoc_insertion_point(class_scope:binlogdata.FieldEvent)
))
_sym_db.RegisterMessage(FieldEvent)
VEvent = _reflection.GeneratedProtocolMessageType('VEvent', (_message.Message,), dict(
DESCRIPTOR = _VEVENT,
__module__ = 'binlogdata_pb2'