Merge pull request #4637 from planetscale/ss-vheart

vreplication: improved lag tracking
This commit is contained in:
Sugu Sougoumarane 2019-03-03 13:28:01 -08:00 коммит произвёл GitHub
Родитель 6585831f8f bec54fd6f2
Коммит 31178f975a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 226 добавлений и 152 удалений

Просмотреть файл

@ -48,7 +48,7 @@ func (x OnDDLAction) String() string {
return proto.EnumName(OnDDLAction_name, int32(x))
}
func (OnDDLAction) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{0}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{0}
}
// VEventType enumerates the event types.
@ -57,20 +57,21 @@ func (OnDDLAction) EnumDescriptor() ([]byte, []int) {
type VEventType int32
const (
VEventType_UNKNOWN VEventType = 0
VEventType_GTID VEventType = 1
VEventType_BEGIN VEventType = 2
VEventType_COMMIT VEventType = 3
VEventType_ROLLBACK VEventType = 4
VEventType_DDL VEventType = 5
VEventType_INSERT VEventType = 6
VEventType_REPLACE VEventType = 7
VEventType_UPDATE VEventType = 8
VEventType_DELETE VEventType = 9
VEventType_SET VEventType = 10
VEventType_OTHER VEventType = 11
VEventType_ROW VEventType = 12
VEventType_FIELD VEventType = 13
VEventType_UNKNOWN VEventType = 0
VEventType_GTID VEventType = 1
VEventType_BEGIN VEventType = 2
VEventType_COMMIT VEventType = 3
VEventType_ROLLBACK VEventType = 4
VEventType_DDL VEventType = 5
VEventType_INSERT VEventType = 6
VEventType_REPLACE VEventType = 7
VEventType_UPDATE VEventType = 8
VEventType_DELETE VEventType = 9
VEventType_SET VEventType = 10
VEventType_OTHER VEventType = 11
VEventType_ROW VEventType = 12
VEventType_FIELD VEventType = 13
VEventType_HEARTBEAT VEventType = 14
)
var VEventType_name = map[int32]string{
@ -88,29 +89,31 @@ var VEventType_name = map[int32]string{
11: "OTHER",
12: "ROW",
13: "FIELD",
14: "HEARTBEAT",
}
var VEventType_value = map[string]int32{
"UNKNOWN": 0,
"GTID": 1,
"BEGIN": 2,
"COMMIT": 3,
"ROLLBACK": 4,
"DDL": 5,
"INSERT": 6,
"REPLACE": 7,
"UPDATE": 8,
"DELETE": 9,
"SET": 10,
"OTHER": 11,
"ROW": 12,
"FIELD": 13,
"UNKNOWN": 0,
"GTID": 1,
"BEGIN": 2,
"COMMIT": 3,
"ROLLBACK": 4,
"DDL": 5,
"INSERT": 6,
"REPLACE": 7,
"UPDATE": 8,
"DELETE": 9,
"SET": 10,
"OTHER": 11,
"ROW": 12,
"FIELD": 13,
"HEARTBEAT": 14,
}
func (x VEventType) String() string {
return proto.EnumName(VEventType_name, int32(x))
}
func (VEventType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1}
}
type BinlogTransaction_Statement_Category int32
@ -158,7 +161,7 @@ func (x BinlogTransaction_Statement_Category) String() string {
return proto.EnumName(BinlogTransaction_Statement_Category_name, int32(x))
}
func (BinlogTransaction_Statement_Category) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1, 0, 0}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1, 0, 0}
}
// Charset is the per-statement charset info from a QUERY_EVENT binlog entry.
@ -178,7 +181,7 @@ func (m *Charset) Reset() { *m = Charset{} }
func (m *Charset) String() string { return proto.CompactTextString(m) }
func (*Charset) ProtoMessage() {}
func (*Charset) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{0}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{0}
}
func (m *Charset) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Charset.Unmarshal(m, b)
@ -235,7 +238,7 @@ func (m *BinlogTransaction) Reset() { *m = BinlogTransaction{} }
func (m *BinlogTransaction) String() string { return proto.CompactTextString(m) }
func (*BinlogTransaction) ProtoMessage() {}
func (*BinlogTransaction) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1}
}
func (m *BinlogTransaction) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogTransaction.Unmarshal(m, b)
@ -285,7 +288,7 @@ func (m *BinlogTransaction_Statement) Reset() { *m = BinlogTransaction_S
func (m *BinlogTransaction_Statement) String() string { return proto.CompactTextString(m) }
func (*BinlogTransaction_Statement) ProtoMessage() {}
func (*BinlogTransaction_Statement) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{1, 0}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{1, 0}
}
func (m *BinlogTransaction_Statement) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogTransaction_Statement.Unmarshal(m, b)
@ -343,7 +346,7 @@ func (m *StreamKeyRangeRequest) Reset() { *m = StreamKeyRangeRequest{} }
func (m *StreamKeyRangeRequest) String() string { return proto.CompactTextString(m) }
func (*StreamKeyRangeRequest) ProtoMessage() {}
func (*StreamKeyRangeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{2}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{2}
}
func (m *StreamKeyRangeRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamKeyRangeRequest.Unmarshal(m, b)
@ -396,7 +399,7 @@ func (m *StreamKeyRangeResponse) Reset() { *m = StreamKeyRangeResponse{}
func (m *StreamKeyRangeResponse) String() string { return proto.CompactTextString(m) }
func (*StreamKeyRangeResponse) ProtoMessage() {}
func (*StreamKeyRangeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{3}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{3}
}
func (m *StreamKeyRangeResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamKeyRangeResponse.Unmarshal(m, b)
@ -440,7 +443,7 @@ func (m *StreamTablesRequest) Reset() { *m = StreamTablesRequest{} }
func (m *StreamTablesRequest) String() string { return proto.CompactTextString(m) }
func (*StreamTablesRequest) ProtoMessage() {}
func (*StreamTablesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{4}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{4}
}
func (m *StreamTablesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamTablesRequest.Unmarshal(m, b)
@ -493,7 +496,7 @@ func (m *StreamTablesResponse) Reset() { *m = StreamTablesResponse{} }
func (m *StreamTablesResponse) String() string { return proto.CompactTextString(m) }
func (*StreamTablesResponse) ProtoMessage() {}
func (*StreamTablesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{5}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{5}
}
func (m *StreamTablesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_StreamTablesResponse.Unmarshal(m, b)
@ -538,7 +541,7 @@ func (m *Rule) Reset() { *m = Rule{} }
func (m *Rule) String() string { return proto.CompactTextString(m) }
func (*Rule) ProtoMessage() {}
func (*Rule) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{6}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{6}
}
func (m *Rule) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Rule.Unmarshal(m, b)
@ -585,7 +588,7 @@ func (m *Filter) Reset() { *m = Filter{} }
func (m *Filter) String() string { return proto.CompactTextString(m) }
func (*Filter) ProtoMessage() {}
func (*Filter) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{7}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{7}
}
func (m *Filter) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Filter.Unmarshal(m, b)
@ -640,7 +643,7 @@ func (m *BinlogSource) Reset() { *m = BinlogSource{} }
func (m *BinlogSource) String() string { return proto.CompactTextString(m) }
func (*BinlogSource) ProtoMessage() {}
func (*BinlogSource) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{8}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{8}
}
func (m *BinlogSource) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_BinlogSource.Unmarshal(m, b)
@ -722,7 +725,7 @@ func (m *RowChange) Reset() { *m = RowChange{} }
func (m *RowChange) String() string { return proto.CompactTextString(m) }
func (*RowChange) ProtoMessage() {}
func (*RowChange) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{9}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{9}
}
func (m *RowChange) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RowChange.Unmarshal(m, b)
@ -769,7 +772,7 @@ func (m *RowEvent) Reset() { *m = RowEvent{} }
func (m *RowEvent) String() string { return proto.CompactTextString(m) }
func (*RowEvent) ProtoMessage() {}
func (*RowEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{10}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{10}
}
func (m *RowEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RowEvent.Unmarshal(m, b)
@ -815,7 +818,7 @@ func (m *FieldEvent) Reset() { *m = FieldEvent{} }
func (m *FieldEvent) String() string { return proto.CompactTextString(m) }
func (*FieldEvent) ProtoMessage() {}
func (*FieldEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{11}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{11}
}
func (m *FieldEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_FieldEvent.Unmarshal(m, b)
@ -851,22 +854,24 @@ func (m *FieldEvent) GetFields() []*query.Field {
// VEvent represents a vstream event
type VEvent struct {
Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"`
Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"`
RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"`
FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"`
Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"`
RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"`
FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"`
// current_time specifies the current time to handle clock skew.
CurrentTime int64 `protobuf:"varint,20,opt,name=current_time,json=currentTime,proto3" json:"current_time,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *VEvent) Reset() { *m = VEvent{} }
func (m *VEvent) String() string { return proto.CompactTextString(m) }
func (*VEvent) ProtoMessage() {}
func (*VEvent) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{12}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{12}
}
func (m *VEvent) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VEvent.Unmarshal(m, b)
@ -928,6 +933,13 @@ func (m *VEvent) GetFieldEvent() *FieldEvent {
return nil
}
func (m *VEvent) GetCurrentTime() int64 {
if m != nil {
return m.CurrentTime
}
return 0
}
// VStreamRequest is the payload for VStream
type VStreamRequest struct {
EffectiveCallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=effective_caller_id,json=effectiveCallerId,proto3" json:"effective_caller_id,omitempty"`
@ -944,7 +956,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} }
func (m *VStreamRequest) String() string { return proto.CompactTextString(m) }
func (*VStreamRequest) ProtoMessage() {}
func (*VStreamRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{13}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{13}
}
func (m *VStreamRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VStreamRequest.Unmarshal(m, b)
@ -1011,7 +1023,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} }
func (m *VStreamResponse) String() string { return proto.CompactTextString(m) }
func (*VStreamResponse) ProtoMessage() {}
func (*VStreamResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_binlogdata_6d214635eb8c538c, []int{14}
return fileDescriptor_binlogdata_60517ed2deb82a7b, []int{14}
}
func (m *VStreamResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VStreamResponse.Unmarshal(m, b)
@ -1060,82 +1072,84 @@ func init() {
proto.RegisterEnum("binlogdata.BinlogTransaction_Statement_Category", BinlogTransaction_Statement_Category_name, BinlogTransaction_Statement_Category_value)
}
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_6d214635eb8c538c) }
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_binlogdata_60517ed2deb82a7b) }
var fileDescriptor_binlogdata_6d214635eb8c538c = []byte{
// 1184 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5b, 0x6e, 0xdb, 0x56,
0x13, 0x8e, 0x44, 0x8a, 0x12, 0x87, 0x8e, 0x4d, 0x1f, 0x5f, 0x7e, 0xc1, 0xf8, 0x03, 0x18, 0x44,
0xdb, 0xb8, 0x06, 0x2a, 0xa7, 0xea, 0xed, 0xa9, 0x2d, 0x2c, 0x91, 0x71, 0x95, 0xd0, 0x92, 0x73,
0xcc, 0x24, 0x45, 0x5e, 0x08, 0x9a, 0x3c, 0xb2, 0x09, 0x53, 0xa4, 0x4c, 0x1e, 0xcb, 0xd5, 0x0a,
0xba, 0x80, 0xbe, 0x76, 0x03, 0xed, 0x42, 0xba, 0x92, 0x76, 0x1f, 0xc5, 0xb9, 0x90, 0x92, 0x1c,
0xa0, 0x71, 0x1f, 0xfa, 0x36, 0xf7, 0x33, 0xf3, 0xcd, 0x70, 0x86, 0x60, 0x5e, 0xc4, 0x69, 0x92,
0x5d, 0x46, 0x01, 0x0d, 0x3a, 0xd3, 0x3c, 0xa3, 0x19, 0x82, 0x85, 0x64, 0xcf, 0x98, 0xd1, 0x7c,
0x1a, 0x0a, 0xc5, 0x9e, 0x71, 0x73, 0x4b, 0xf2, 0xb9, 0x64, 0xd6, 0x69, 0x36, 0xcd, 0x16, 0x5e,
0xd6, 0x29, 0x34, 0xfb, 0x57, 0x41, 0x5e, 0x10, 0x8a, 0x76, 0x41, 0x0b, 0x93, 0x98, 0xa4, 0xb4,
0x5d, 0xdb, 0xaf, 0x1d, 0x34, 0xb0, 0xe4, 0x10, 0x02, 0x35, 0xcc, 0xd2, 0xb4, 0x5d, 0xe7, 0x52,
0x4e, 0x33, 0xdb, 0x82, 0xe4, 0x33, 0x92, 0xb7, 0x15, 0x61, 0x2b, 0x38, 0xeb, 0x2f, 0x05, 0x36,
0x7b, 0x3c, 0x0f, 0x2f, 0x0f, 0xd2, 0x22, 0x08, 0x69, 0x9c, 0xa5, 0xe8, 0x04, 0xa0, 0xa0, 0x01,
0x25, 0x13, 0x92, 0xd2, 0xa2, 0x5d, 0xdb, 0x57, 0x0e, 0x8c, 0xee, 0xd3, 0xce, 0x52, 0x05, 0xef,
0xb9, 0x74, 0xce, 0x4b, 0x7b, 0xbc, 0xe4, 0x8a, 0xba, 0x60, 0x90, 0x19, 0x49, 0xa9, 0x4f, 0xb3,
0x6b, 0x92, 0xb6, 0xd5, 0xfd, 0xda, 0x81, 0xd1, 0xdd, 0xec, 0x88, 0x02, 0x1d, 0xa6, 0xf1, 0x98,
0x02, 0x03, 0xa9, 0xe8, 0xbd, 0x3f, 0xea, 0xa0, 0x57, 0xd1, 0x90, 0x0b, 0xad, 0x30, 0xa0, 0xe4,
0x32, 0xcb, 0xe7, 0xbc, 0xcc, 0xf5, 0xee, 0xb3, 0x07, 0x26, 0xd2, 0xe9, 0x4b, 0x3f, 0x5c, 0x45,
0x40, 0x9f, 0x41, 0x33, 0x14, 0xe8, 0x71, 0x74, 0x8c, 0xee, 0xd6, 0x72, 0x30, 0x09, 0x2c, 0x2e,
0x6d, 0x90, 0x09, 0x4a, 0x71, 0x93, 0x70, 0xc8, 0xd6, 0x30, 0x23, 0xad, 0xdf, 0x6a, 0xd0, 0x2a,
0xe3, 0xa2, 0x2d, 0xd8, 0xe8, 0xb9, 0xfe, 0xeb, 0x21, 0x76, 0xfa, 0xa3, 0x93, 0xe1, 0xe0, 0x9d,
0x63, 0x9b, 0x8f, 0xd0, 0x1a, 0xb4, 0x7a, 0xae, 0xdf, 0x73, 0x4e, 0x06, 0x43, 0xb3, 0x86, 0x1e,
0x83, 0xde, 0x73, 0xfd, 0xfe, 0xe8, 0xf4, 0x74, 0xe0, 0x99, 0x75, 0xb4, 0x01, 0x46, 0xcf, 0xf5,
0xf1, 0xc8, 0x75, 0x7b, 0xc7, 0xfd, 0x97, 0xa6, 0x82, 0x76, 0x60, 0xb3, 0xe7, 0xfa, 0xf6, 0xa9,
0xeb, 0xdb, 0xce, 0x19, 0x76, 0xfa, 0xc7, 0x9e, 0x63, 0x9b, 0x2a, 0x02, 0xd0, 0x98, 0xd8, 0x76,
0xcd, 0x86, 0xa4, 0xcf, 0x1d, 0xcf, 0xd4, 0x64, 0xb8, 0xc1, 0xf0, 0xdc, 0xc1, 0x9e, 0xd9, 0x94,
0xec, 0xeb, 0x33, 0xfb, 0xd8, 0x73, 0xcc, 0x96, 0x64, 0x6d, 0xc7, 0x75, 0x3c, 0xc7, 0xd4, 0x5f,
0xa8, 0xad, 0xba, 0xa9, 0xbc, 0x50, 0x5b, 0x8a, 0xa9, 0x5a, 0xbf, 0xd4, 0x60, 0xe7, 0x9c, 0xe6,
0x24, 0x98, 0xbc, 0x24, 0x73, 0x1c, 0xa4, 0x97, 0x04, 0x93, 0x9b, 0x5b, 0x52, 0x50, 0xb4, 0x07,
0xad, 0x69, 0x56, 0xc4, 0x0c, 0x3b, 0x0e, 0xb0, 0x8e, 0x2b, 0x1e, 0x1d, 0x81, 0x7e, 0x4d, 0xe6,
0x7e, 0xce, 0xec, 0x25, 0x60, 0xa8, 0x53, 0x0d, 0x64, 0x15, 0xa9, 0x75, 0x2d, 0xa9, 0x65, 0x7c,
0x95, 0x0f, 0xe3, 0x6b, 0x8d, 0x61, 0xf7, 0x7e, 0x52, 0xc5, 0x34, 0x4b, 0x0b, 0x82, 0x5c, 0x40,
0xc2, 0xd1, 0xa7, 0x8b, 0xde, 0xf2, 0xfc, 0x8c, 0xee, 0x93, 0x7f, 0x1c, 0x00, 0xbc, 0x79, 0x71,
0x5f, 0x64, 0xfd, 0x04, 0x5b, 0xe2, 0x1d, 0x2f, 0xb8, 0x48, 0x48, 0xf1, 0x90, 0xd2, 0x77, 0x41,
0xa3, 0xdc, 0xb8, 0x5d, 0xdf, 0x57, 0x0e, 0x74, 0x2c, 0xb9, 0x7f, 0x5b, 0x61, 0x04, 0xdb, 0xab,
0x2f, 0xff, 0x27, 0xf5, 0x7d, 0x09, 0x2a, 0xbe, 0x4d, 0x08, 0xda, 0x86, 0xc6, 0x24, 0xa0, 0xe1,
0x95, 0xac, 0x46, 0x30, 0xac, 0x94, 0x71, 0x9c, 0x50, 0x92, 0xf3, 0x16, 0xea, 0x58, 0x72, 0xd6,
0x33, 0xd0, 0x9e, 0x73, 0x0a, 0x7d, 0x02, 0x8d, 0xfc, 0x96, 0xd5, 0x2a, 0x3e, 0x75, 0x73, 0x39,
0x01, 0x16, 0x18, 0x0b, 0xb5, 0xf5, 0x6b, 0x1d, 0xd6, 0x44, 0x42, 0xe7, 0xd9, 0x6d, 0x1e, 0x12,
0x86, 0xe0, 0x35, 0x99, 0x17, 0xd3, 0x20, 0x24, 0x25, 0x82, 0x25, 0xcf, 0x92, 0x29, 0xae, 0x82,
0x3c, 0x92, 0xaf, 0x0a, 0x06, 0x7d, 0x05, 0x06, 0x47, 0x92, 0xfa, 0x74, 0x3e, 0x25, 0x1c, 0xc3,
0xf5, 0xee, 0xf6, 0x62, 0xa8, 0x38, 0x4e, 0xd4, 0x9b, 0x4f, 0x09, 0x06, 0x5a, 0xd1, 0xab, 0x93,
0xa8, 0x3e, 0x60, 0x12, 0x17, 0xfd, 0x6b, 0xac, 0xf4, 0xef, 0xb0, 0x02, 0x43, 0x93, 0x51, 0x96,
0x6a, 0x15, 0x70, 0x94, 0x00, 0xa1, 0x0e, 0x68, 0x59, 0xea, 0x47, 0x51, 0xd2, 0x6e, 0xf2, 0x34,
0xff, 0xb7, 0x6c, 0x3b, 0x4a, 0x6d, 0xdb, 0x3d, 0x16, 0x2d, 0x69, 0x64, 0xa9, 0x1d, 0x25, 0xd6,
0x2b, 0xd0, 0x71, 0x76, 0xd7, 0xbf, 0xe2, 0x09, 0x58, 0xa0, 0x5d, 0x90, 0x71, 0x96, 0x13, 0xd9,
0x55, 0x90, 0x5b, 0x0f, 0x67, 0x77, 0x58, 0x6a, 0xd0, 0x3e, 0x34, 0x82, 0x71, 0xd9, 0x98, 0x55,
0x13, 0xa1, 0xb0, 0x02, 0x68, 0xe1, 0xec, 0x8e, 0x6f, 0x4a, 0xf4, 0x04, 0x04, 0x22, 0x7e, 0x1a,
0x4c, 0x4a, 0xb8, 0x75, 0x2e, 0x19, 0x06, 0x13, 0x82, 0xbe, 0x06, 0x23, 0xcf, 0xee, 0xfc, 0x90,
0x3f, 0x2f, 0xc6, 0xd6, 0xe8, 0xee, 0xac, 0xb4, 0xb2, 0x4c, 0x0e, 0x43, 0x5e, 0x92, 0x85, 0xf5,
0x0a, 0xe0, 0x79, 0x4c, 0x92, 0xe8, 0x41, 0x8f, 0x7c, 0xc4, 0xe0, 0x23, 0x49, 0x54, 0xc6, 0x5f,
0x93, 0x29, 0xf3, 0x08, 0x58, 0xea, 0xac, 0x3f, 0x6b, 0xa0, 0xbd, 0x11, 0xf1, 0x0e, 0x41, 0xe5,
0x8d, 0x16, 0xbb, 0x7b, 0x77, 0x39, 0x1d, 0x61, 0xc1, 0x5b, 0xcd, 0x6d, 0xd0, 0xff, 0x41, 0xa7,
0xf1, 0x84, 0x14, 0x34, 0x98, 0x4c, 0x39, 0x24, 0x0a, 0x5e, 0x08, 0xd8, 0x59, 0xbb, 0xa4, 0x71,
0xc4, 0x47, 0x46, 0xc7, 0x9c, 0x66, 0x0b, 0x9a, 0xb5, 0x47, 0xe5, 0x22, 0x46, 0xa2, 0xcf, 0x41,
0x67, 0x28, 0xf0, 0x7b, 0xd2, 0x6e, 0x70, 0x58, 0xb7, 0xef, 0x61, 0xc0, 0x9f, 0xc5, 0xad, 0xbc,
0xc4, 0xf5, 0x1b, 0x30, 0x78, 0xde, 0xd2, 0x49, 0xcc, 0xc5, 0xee, 0xea, 0x5c, 0x94, 0xf8, 0x60,
0x18, 0x57, 0xb4, 0xf5, 0x73, 0x1d, 0xd6, 0xdf, 0x88, 0xcf, 0xbb, 0x5c, 0x29, 0xdf, 0xc3, 0x16,
0x19, 0x8f, 0x49, 0x48, 0xe3, 0x19, 0xf1, 0xc3, 0x20, 0x49, 0x48, 0xee, 0xc7, 0x91, 0x1c, 0x81,
0x8d, 0x8e, 0x38, 0xf3, 0x7d, 0x2e, 0x1f, 0xd8, 0x78, 0xb3, 0xb2, 0x95, 0xa2, 0x08, 0x39, 0xb0,
0x15, 0x4f, 0x26, 0x24, 0x8a, 0x03, 0xba, 0x1c, 0x40, 0x0c, 0xc8, 0x8e, 0x44, 0xfb, 0x8d, 0x77,
0x12, 0x50, 0xb2, 0x08, 0x53, 0x79, 0x54, 0x61, 0x3e, 0x66, 0xe3, 0x9f, 0x5f, 0x56, 0x5b, 0xea,
0xb1, 0xf4, 0xf4, 0xb8, 0x10, 0x4b, 0xe5, 0xca, 0x06, 0x54, 0xef, 0x6d, 0xc0, 0xc5, 0x97, 0xd2,
0xf8, 0xd0, 0x97, 0x62, 0x7d, 0x0b, 0x1b, 0x15, 0x10, 0x72, 0xc3, 0x1d, 0x82, 0xc6, 0xf1, 0x2c,
0x97, 0x0a, 0x7a, 0xbf, 0xf5, 0x58, 0x5a, 0x1c, 0x7e, 0x07, 0xc6, 0xd2, 0xe7, 0xc4, 0x2e, 0xde,
0xe0, 0x64, 0x38, 0xc2, 0x8e, 0xf9, 0x08, 0xb5, 0x40, 0x3d, 0xf7, 0x46, 0x67, 0x66, 0x8d, 0x51,
0xce, 0x8f, 0x4e, 0x5f, 0x5c, 0x51, 0x46, 0xf9, 0xd2, 0x48, 0x39, 0xfc, 0xbd, 0x06, 0xb0, 0x98,
0x26, 0x64, 0x40, 0xf3, 0xf5, 0xf0, 0xe5, 0x70, 0xf4, 0x76, 0x28, 0x02, 0x9c, 0x78, 0x03, 0xdb,
0xac, 0x21, 0x1d, 0x1a, 0xe2, 0x2c, 0xd7, 0xd9, 0x0b, 0xf2, 0x26, 0x2b, 0xec, 0x60, 0x57, 0x07,
0x59, 0x45, 0x4d, 0x50, 0xaa, 0xb3, 0x2b, 0xef, 0xac, 0xc6, 0x02, 0x62, 0xe7, 0xcc, 0x3d, 0xee,
0x3b, 0x66, 0x93, 0x29, 0xaa, 0x8b, 0x0b, 0xa0, 0x95, 0xe7, 0x96, 0x79, 0xb2, 0x23, 0x0d, 0xec,
0x9d, 0x91, 0xf7, 0x83, 0x83, 0x4d, 0x83, 0xc9, 0xf0, 0xe8, 0xad, 0xb9, 0xc6, 0x64, 0xcf, 0x07,
0x8e, 0x6b, 0x9b, 0x8f, 0x7b, 0x9f, 0xbe, 0x7b, 0x3a, 0x8b, 0x29, 0x29, 0x8a, 0x4e, 0x9c, 0x1d,
0x09, 0xea, 0xe8, 0x32, 0x3b, 0x9a, 0xd1, 0x23, 0xfe, 0x87, 0x77, 0xb4, 0x80, 0xe9, 0x42, 0xe3,
0x92, 0x2f, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x68, 0xbd, 0x20, 0x05, 0x3d, 0x0a, 0x00, 0x00,
var fileDescriptor_binlogdata_60517ed2deb82a7b = []byte{
// 1215 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x5d, 0x6e, 0xdb, 0x46,
0x10, 0x8e, 0x44, 0x8a, 0x12, 0x87, 0x8e, 0x4d, 0xaf, 0x7f, 0x2a, 0x18, 0x0d, 0xe0, 0x12, 0x6d,
0xe3, 0x1a, 0xa8, 0x9c, 0xaa, 0x7f, 0x4f, 0x6d, 0x21, 0x89, 0x8c, 0xa3, 0x84, 0x96, 0x9c, 0x35,
0x93, 0x14, 0x79, 0x21, 0x68, 0x72, 0x65, 0x13, 0xa6, 0x48, 0x85, 0x5c, 0xdb, 0xd5, 0x09, 0x7a,
0x80, 0xbe, 0xf6, 0x02, 0x3d, 0x42, 0x2f, 0xd0, 0x9b, 0xf4, 0x1e, 0xc5, 0xfe, 0x90, 0x92, 0x1c,
0xa0, 0x71, 0x1f, 0xfa, 0x36, 0xff, 0x3b, 0xf3, 0xcd, 0x70, 0x86, 0x60, 0x9e, 0xc7, 0x69, 0x92,
0x5d, 0x44, 0x01, 0x0d, 0x3a, 0xb3, 0x3c, 0xa3, 0x19, 0x82, 0x85, 0x64, 0xcf, 0xb8, 0xa1, 0xf9,
0x2c, 0x14, 0x8a, 0x3d, 0xe3, 0xdd, 0x35, 0xc9, 0xe7, 0x92, 0x59, 0xa7, 0xd9, 0x2c, 0x5b, 0x78,
0x59, 0x27, 0xd0, 0x1c, 0x5c, 0x06, 0x79, 0x41, 0x28, 0xda, 0x05, 0x2d, 0x4c, 0x62, 0x92, 0xd2,
0x76, 0x6d, 0xbf, 0x76, 0xd0, 0xc0, 0x92, 0x43, 0x08, 0xd4, 0x30, 0x4b, 0xd3, 0x76, 0x9d, 0x4b,
0x39, 0xcd, 0x6c, 0x0b, 0x92, 0xdf, 0x90, 0xbc, 0xad, 0x08, 0x5b, 0xc1, 0x59, 0x7f, 0x2b, 0xb0,
0xd9, 0xe7, 0x79, 0x78, 0x79, 0x90, 0x16, 0x41, 0x48, 0xe3, 0x2c, 0x45, 0xc7, 0x00, 0x05, 0x0d,
0x28, 0x99, 0x92, 0x94, 0x16, 0xed, 0xda, 0xbe, 0x72, 0x60, 0x74, 0x1f, 0x77, 0x96, 0x2a, 0x78,
0xcf, 0xa5, 0x73, 0x56, 0xda, 0xe3, 0x25, 0x57, 0xd4, 0x05, 0x83, 0xdc, 0x90, 0x94, 0xfa, 0x34,
0xbb, 0x22, 0x69, 0x5b, 0xdd, 0xaf, 0x1d, 0x18, 0xdd, 0xcd, 0x8e, 0x28, 0xd0, 0x61, 0x1a, 0x8f,
0x29, 0x30, 0x90, 0x8a, 0xde, 0xfb, 0xab, 0x0e, 0x7a, 0x15, 0x0d, 0xb9, 0xd0, 0x0a, 0x03, 0x4a,
0x2e, 0xb2, 0x7c, 0xce, 0xcb, 0x5c, 0xef, 0x3e, 0xb9, 0x67, 0x22, 0x9d, 0x81, 0xf4, 0xc3, 0x55,
0x04, 0xf4, 0x25, 0x34, 0x43, 0x81, 0x1e, 0x47, 0xc7, 0xe8, 0x6e, 0x2d, 0x07, 0x93, 0xc0, 0xe2,
0xd2, 0x06, 0x99, 0xa0, 0x14, 0xef, 0x12, 0x0e, 0xd9, 0x1a, 0x66, 0xa4, 0xf5, 0x47, 0x0d, 0x5a,
0x65, 0x5c, 0xb4, 0x05, 0x1b, 0x7d, 0xd7, 0x7f, 0x35, 0xc2, 0xce, 0x60, 0x7c, 0x3c, 0x1a, 0xbe,
0x75, 0x6c, 0xf3, 0x01, 0x5a, 0x83, 0x56, 0xdf, 0xf5, 0xfb, 0xce, 0xf1, 0x70, 0x64, 0xd6, 0xd0,
0x43, 0xd0, 0xfb, 0xae, 0x3f, 0x18, 0x9f, 0x9c, 0x0c, 0x3d, 0xb3, 0x8e, 0x36, 0xc0, 0xe8, 0xbb,
0x3e, 0x1e, 0xbb, 0x6e, 0xbf, 0x37, 0x78, 0x61, 0x2a, 0x68, 0x07, 0x36, 0xfb, 0xae, 0x6f, 0x9f,
0xb8, 0xbe, 0xed, 0x9c, 0x62, 0x67, 0xd0, 0xf3, 0x1c, 0xdb, 0x54, 0x11, 0x80, 0xc6, 0xc4, 0xb6,
0x6b, 0x36, 0x24, 0x7d, 0xe6, 0x78, 0xa6, 0x26, 0xc3, 0x0d, 0x47, 0x67, 0x0e, 0xf6, 0xcc, 0xa6,
0x64, 0x5f, 0x9d, 0xda, 0x3d, 0xcf, 0x31, 0x5b, 0x92, 0xb5, 0x1d, 0xd7, 0xf1, 0x1c, 0x53, 0x7f,
0xae, 0xb6, 0xea, 0xa6, 0xf2, 0x5c, 0x6d, 0x29, 0xa6, 0x6a, 0xfd, 0x56, 0x83, 0x9d, 0x33, 0x9a,
0x93, 0x60, 0xfa, 0x82, 0xcc, 0x71, 0x90, 0x5e, 0x10, 0x4c, 0xde, 0x5d, 0x93, 0x82, 0xa2, 0x3d,
0x68, 0xcd, 0xb2, 0x22, 0x66, 0xd8, 0x71, 0x80, 0x75, 0x5c, 0xf1, 0xe8, 0x08, 0xf4, 0x2b, 0x32,
0xf7, 0x73, 0x66, 0x2f, 0x01, 0x43, 0x9d, 0x6a, 0x20, 0xab, 0x48, 0xad, 0x2b, 0x49, 0x2d, 0xe3,
0xab, 0x7c, 0x18, 0x5f, 0x6b, 0x02, 0xbb, 0x77, 0x93, 0x2a, 0x66, 0x59, 0x5a, 0x10, 0xe4, 0x02,
0x12, 0x8e, 0x3e, 0x5d, 0xf4, 0x96, 0xe7, 0x67, 0x74, 0x1f, 0xfd, 0xeb, 0x00, 0xe0, 0xcd, 0xf3,
0xbb, 0x22, 0xeb, 0x17, 0xd8, 0x12, 0xef, 0x78, 0xc1, 0x79, 0x42, 0x8a, 0xfb, 0x94, 0xbe, 0x0b,
0x1a, 0xe5, 0xc6, 0xed, 0xfa, 0xbe, 0x72, 0xa0, 0x63, 0xc9, 0xfd, 0xd7, 0x0a, 0x23, 0xd8, 0x5e,
0x7d, 0xf9, 0x7f, 0xa9, 0xef, 0x1b, 0x50, 0xf1, 0x75, 0x42, 0xd0, 0x36, 0x34, 0xa6, 0x01, 0x0d,
0x2f, 0x65, 0x35, 0x82, 0x61, 0xa5, 0x4c, 0xe2, 0x84, 0x92, 0x9c, 0xb7, 0x50, 0xc7, 0x92, 0xb3,
0x9e, 0x80, 0xf6, 0x94, 0x53, 0xe8, 0x73, 0x68, 0xe4, 0xd7, 0xac, 0x56, 0xf1, 0xa9, 0x9b, 0xcb,
0x09, 0xb0, 0xc0, 0x58, 0xa8, 0xad, 0xdf, 0xeb, 0xb0, 0x26, 0x12, 0x3a, 0xcb, 0xae, 0xf3, 0x90,
0x30, 0x04, 0xaf, 0xc8, 0xbc, 0x98, 0x05, 0x21, 0x29, 0x11, 0x2c, 0x79, 0x96, 0x4c, 0x71, 0x19,
0xe4, 0x91, 0x7c, 0x55, 0x30, 0xe8, 0x5b, 0x30, 0x38, 0x92, 0xd4, 0xa7, 0xf3, 0x19, 0xe1, 0x18,
0xae, 0x77, 0xb7, 0x17, 0x43, 0xc5, 0x71, 0xa2, 0xde, 0x7c, 0x46, 0x30, 0xd0, 0x8a, 0x5e, 0x9d,
0x44, 0xf5, 0x1e, 0x93, 0xb8, 0xe8, 0x5f, 0x63, 0xa5, 0x7f, 0x87, 0x15, 0x18, 0x9a, 0x8c, 0xb2,
0x54, 0xab, 0x80, 0xa3, 0x04, 0x08, 0x75, 0x40, 0xcb, 0x52, 0x3f, 0x8a, 0x92, 0x76, 0x93, 0xa7,
0xf9, 0xd1, 0xb2, 0xed, 0x38, 0xb5, 0x6d, 0xb7, 0x27, 0x5a, 0xd2, 0xc8, 0x52, 0x3b, 0x4a, 0xac,
0x97, 0xa0, 0xe3, 0xec, 0x76, 0x70, 0xc9, 0x13, 0xb0, 0x40, 0x3b, 0x27, 0x93, 0x2c, 0x27, 0xb2,
0xab, 0x20, 0xb7, 0x1e, 0xce, 0x6e, 0xb1, 0xd4, 0xa0, 0x7d, 0x68, 0x04, 0x93, 0xb2, 0x31, 0xab,
0x26, 0x42, 0x61, 0x05, 0xd0, 0xc2, 0xd9, 0x2d, 0xdf, 0x94, 0xe8, 0x11, 0x08, 0x44, 0xfc, 0x34,
0x98, 0x96, 0x70, 0xeb, 0x5c, 0x32, 0x0a, 0xa6, 0x04, 0x7d, 0x07, 0x46, 0x9e, 0xdd, 0xfa, 0x21,
0x7f, 0x5e, 0x8c, 0xad, 0xd1, 0xdd, 0x59, 0x69, 0x65, 0x99, 0x1c, 0x86, 0xbc, 0x24, 0x0b, 0xeb,
0x25, 0xc0, 0xd3, 0x98, 0x24, 0xd1, 0xbd, 0x1e, 0xf9, 0x94, 0xc1, 0x47, 0x92, 0xa8, 0x8c, 0xbf,
0x26, 0x53, 0xe6, 0x11, 0xb0, 0xd4, 0x59, 0xbf, 0xd6, 0x41, 0x7b, 0x2d, 0xe2, 0x1d, 0x82, 0xca,
0x1b, 0x2d, 0x76, 0xf7, 0xee, 0x72, 0x3a, 0xc2, 0x82, 0xb7, 0x9a, 0xdb, 0xa0, 0x8f, 0x41, 0xa7,
0xf1, 0x94, 0x14, 0x34, 0x98, 0xce, 0x38, 0x24, 0x0a, 0x5e, 0x08, 0xd8, 0x59, 0xbb, 0xa0, 0x71,
0xc4, 0x47, 0x46, 0xc7, 0x9c, 0x66, 0x0b, 0x9a, 0xb5, 0x47, 0xe5, 0x22, 0x46, 0xa2, 0xaf, 0x40,
0x67, 0x28, 0xf0, 0x7b, 0xd2, 0x6e, 0x70, 0x58, 0xb7, 0xef, 0x60, 0xc0, 0x9f, 0xc5, 0xad, 0xbc,
0xc4, 0xf5, 0x7b, 0x30, 0x78, 0xde, 0xd2, 0x49, 0xcc, 0xc5, 0xee, 0xea, 0x5c, 0x94, 0xf8, 0x60,
0x98, 0x2c, 0xb0, 0xfa, 0x04, 0xd6, 0xc2, 0xeb, 0x3c, 0xe7, 0xf7, 0x2d, 0x9e, 0x92, 0xf6, 0x36,
0x4f, 0xd9, 0x90, 0x32, 0x2f, 0x9e, 0x12, 0x86, 0xc4, 0xfa, 0x6b, 0xb1, 0x01, 0xca, 0xad, 0xf3,
0x13, 0x6c, 0x91, 0xc9, 0x84, 0x84, 0x34, 0xbe, 0x21, 0x7e, 0x18, 0x24, 0x09, 0xc9, 0xfd, 0x38,
0x92, 0x53, 0xb2, 0xd1, 0x11, 0x7f, 0x02, 0x03, 0x2e, 0x1f, 0xda, 0x78, 0xb3, 0xb2, 0x95, 0xa2,
0x08, 0x39, 0xb0, 0x15, 0x4f, 0xa7, 0x24, 0x8a, 0x03, 0xba, 0x1c, 0x40, 0xcc, 0xd0, 0x8e, 0x6c,
0xc8, 0x6b, 0xef, 0x38, 0xa0, 0x64, 0x11, 0xa6, 0xf2, 0xa8, 0xc2, 0x7c, 0xc6, 0xbe, 0x90, 0xfc,
0xa2, 0x5a, 0x64, 0x0f, 0xa5, 0xa7, 0xc7, 0x85, 0x58, 0x2a, 0x57, 0x96, 0xa4, 0x7a, 0x67, 0x49,
0x2e, 0x3e, 0xa6, 0xc6, 0x87, 0x3e, 0x26, 0xeb, 0x07, 0xd8, 0xa8, 0x80, 0x90, 0x4b, 0xf0, 0x10,
0x34, 0x0e, 0x79, 0xb9, 0x77, 0xd0, 0xfb, 0xd3, 0x81, 0xa5, 0xc5, 0xe1, 0x8f, 0x60, 0x2c, 0x7d,
0x71, 0xec, 0x28, 0x0e, 0x8f, 0x47, 0x63, 0xec, 0x98, 0x0f, 0x50, 0x0b, 0xd4, 0x33, 0x6f, 0x7c,
0x6a, 0xd6, 0x18, 0xe5, 0xfc, 0xec, 0x0c, 0xc4, 0xa1, 0x65, 0x94, 0x2f, 0x8d, 0x94, 0xc3, 0x3f,
0x6b, 0x00, 0x8b, 0x81, 0x43, 0x06, 0x34, 0x5f, 0x8d, 0x5e, 0x8c, 0xc6, 0x6f, 0x46, 0x22, 0xc0,
0xb1, 0x37, 0xb4, 0xcd, 0x1a, 0xd2, 0xa1, 0x21, 0x2e, 0x77, 0x9d, 0xbd, 0x20, 0xcf, 0xb6, 0xc2,
0x6e, 0x7a, 0x75, 0xb3, 0x55, 0xd4, 0x04, 0xa5, 0xba, 0xcc, 0xf2, 0x14, 0x6b, 0x2c, 0x20, 0x76,
0x4e, 0xdd, 0xde, 0xc0, 0x31, 0x9b, 0x4c, 0x51, 0x1d, 0x65, 0x00, 0xad, 0xbc, 0xc8, 0xcc, 0x93,
0xdd, 0x71, 0x60, 0xef, 0x8c, 0xbd, 0x67, 0x0e, 0x36, 0x0d, 0x26, 0xc3, 0xe3, 0x37, 0xe6, 0x1a,
0x93, 0x3d, 0x1d, 0x3a, 0xae, 0x6d, 0x3e, 0x64, 0x87, 0xfc, 0x99, 0xd3, 0xc3, 0x5e, 0xdf, 0xe9,
0x79, 0xe6, 0x7a, 0xff, 0x8b, 0xb7, 0x8f, 0x6f, 0x62, 0x4a, 0x8a, 0xa2, 0x13, 0x67, 0x47, 0x82,
0x3a, 0xba, 0xc8, 0x8e, 0x6e, 0xe8, 0x11, 0xff, 0x27, 0x3c, 0x5a, 0xa0, 0x76, 0xae, 0x71, 0xc9,
0xd7, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x0e, 0xe3, 0x8a, 0xa5, 0x6f, 0x0a, 0x00, 0x00,
}

Просмотреть файл

@ -38,7 +38,6 @@ type relayLog struct {
curSize int
items [][]*binlogdatapb.VEvent
timedout bool
err error
// canAccept is true if: curSize<=maxSize, len(items)<maxItems, and ctx is not Done.
canAccept sync.Cond
// hasItems is true if len(items)>0, ctx is not Done, and interuptFetch is false.

Просмотреть файл

@ -39,7 +39,10 @@ import (
)
var (
idleTimeout = 1 * time.Second
// idleTimeout is set to slightly above 1s, compared to heartbeatTime
// set by VStreamer at slightly below 1s. This minimizes conflicts
// between the two timeouts.
idleTimeout = 1100 * time.Millisecond
dbLockRetryDelay = 1 * time.Second
relayLogMaxSize = 10000
relayLogMaxItems = 1000
@ -61,7 +64,11 @@ type vplayer struct {
unsavedGTID *binlogdatapb.VEvent
// timeLastSaved is set every time a GTID is saved.
timeLastSaved time.Time
stopPos mysql.Position
// lastTimestampNs is the last timestamp seen so far.
lastTimestampNs int64
// timeOffsetNs keeps track of the clock difference with respect to source tablet.
timeOffsetNs int64
stopPos mysql.Position
// pplan is built based on the source Filter at the beginning.
pplan *PlayerPlan
@ -197,6 +204,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err != nil {
return err
}
// No events were received. This likely means that there's a network partition.
// So, we should assume we're falling behind.
if len(items) == 0 {
behind := time.Now().UnixNano() - vp.lastTimestampNs - vp.timeOffsetNs
vp.stats.SecondsBehindMaster.Set(behind / 1e9)
}
// Filtered replication often ends up receiving a large number of empty transactions.
// This is required because the player needs to know the latest position of the source.
// This allows it to stop at that position if requested.
@ -221,6 +234,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
}
for i, events := range items {
for j, event := range events {
if event.Timestamp != 0 {
vp.lastTimestampNs = event.Timestamp * 1e9
vp.timeOffsetNs = time.Now().UnixNano() - event.CurrentTime
vp.stats.SecondsBehindMaster.Set(event.CurrentTime/1e9 - event.Timestamp)
}
mustSave := false
switch event.Type {
case binlogdatapb.VEventType_COMMIT:
@ -354,6 +372,8 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
return err
}
}
case binlogdatapb.VEventType_HEARTBEAT:
// No-op: heartbeat timings are calculated in outer loop.
}
return nil
}
@ -444,9 +464,6 @@ func (vp *vplayer) updatePos(ts int64) error {
vp.unsavedGTID = nil
vp.timeLastSaved = time.Now()
vp.stats.SetLastPosition(vp.pos)
if ts != 0 {
vp.stats.SecondsBehindMaster.Set(vp.timeLastSaved.Unix() - ts)
}
return nil
}

Просмотреть файл

@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"io"
"time"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
@ -36,6 +37,11 @@ import (
var packetSize = flag.Int("vstream_packet_size", 10000, "Suggested packet size for VReplication streamer. This is used only as a recommendation. The actual packet size may be more or less than this amount.")
// heartbeatTime is set to slightly below 1s, compared to idleTimeout
// set by VPlayer at slightly above 1s. This minimizes conflicts
// between the two timeouts.
var heartbeatTime = 900 * time.Millisecond
type vstreamer struct {
ctx context.Context
cancel func()
@ -132,9 +138,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
// We never have to send GTID, BEGIN or FIELD events on their own.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL:
// COMMIT and DDL are terminal. There may be no more events after
// these for a long time. So, we have to send whatever we have.
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_HEARTBEAT:
// COMMIT, DDL and HEARTBEAT must be immediately sent.
bufferedEvents = append(bufferedEvents, vevent)
vevents := bufferedEvents
bufferedEvents = nil
@ -167,7 +172,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
// Main loop: calls bufferAndTransmit as events arrive.
timer := time.NewTimer(heartbeatTime)
defer timer.Stop()
for {
timer.Reset(heartbeatTime)
// Drain event if timer fired before reset.
select {
case <-timer.C:
default:
}
select {
case ev, ok := <-events:
if !ok {
@ -196,6 +210,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
case <-ctx.Done():
return nil
case <-timer.C:
now := time.Now().UnixNano()
if err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
}); err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("error sending event: %v", err)
}
}
}
}
@ -392,6 +418,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
vevent.CurrentTime = time.Now().UnixNano()
}
return vevents, nil
}

Просмотреть файл

@ -919,6 +919,8 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
t.Fatalf("%v: evs\n%v, want\n%v", input, evs, wantset)
}
for i, want := range wantset {
// CurrentTime is not testable.
evs[i].CurrentTime = 0
switch want {
case "gtid|begin":
if evs[i].Type != binlogdatapb.VEventType_GTID && evs[i].Type != binlogdatapb.VEventType_BEGIN {

Просмотреть файл

@ -185,6 +185,7 @@ enum VEventType {
OTHER = 11;
ROW = 12;
FIELD = 13;
HEARTBEAT = 14;
}
// RowChange represents one row change
@ -212,6 +213,8 @@ message VEvent {
string ddl = 4;
RowEvent row_event = 5;
FieldEvent field_event = 6;
// current_time specifies the current time to handle clock skew.
int64 current_time = 20;
}
// VStreamRequest is the payload for VStream

Просмотреть файл

@ -23,7 +23,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
package='binlogdata',
syntax='proto3',
serialized_options=_b('Z\'vitess.io/vitess/go/vt/proto/binlogdata'),
serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bvtrpc.proto\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xde\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\x12\'\n\x06on_ddl\x18\x07 \x01(\x0e\x32\x17.binlogdata.OnDDLAction\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\"\xb2\x01\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x04 \x01(\t\x12\'\n\trow_event\x18\x05 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x06 \x01(\x0b\x32\x16.binlogdata.FieldEvent\"\xc7\x01\n\x0eVStreamRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\x10\n\x08position\x18\x04 \x01(\t\x12\"\n\x06\x66ilter\x18\x05 \x01(\x0b\x32\x12.binlogdata.Filter\"5\n\x0fVStreamResponse\x12\"\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*>\n\x0bOnDDLAction\x12\n\n\x06IGNORE\x10\x00\x12\x08\n\x04STOP\x10\x01\x12\x08\n\x04\x45XEC\x10\x02\x12\x0f\n\x0b\x45XEC_IGNORE\x10\x03*\xaa\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\rB)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3')
serialized_pb=_b('\n\x10\x62inlogdata.proto\x12\nbinlogdata\x1a\x0bvtrpc.proto\x1a\x0bquery.proto\x1a\x0etopodata.proto\"7\n\x07\x43harset\x12\x0e\n\x06\x63lient\x18\x01 \x01(\x05\x12\x0c\n\x04\x63onn\x18\x02 \x01(\x05\x12\x0e\n\x06server\x18\x03 \x01(\x05\"\xb5\x03\n\x11\x42inlogTransaction\x12;\n\nstatements\x18\x01 \x03(\x0b\x32\'.binlogdata.BinlogTransaction.Statement\x12&\n\x0b\x65vent_token\x18\x04 \x01(\x0b\x32\x11.query.EventToken\x1a\xae\x02\n\tStatement\x12\x42\n\x08\x63\x61tegory\x18\x01 \x01(\x0e\x32\x30.binlogdata.BinlogTransaction.Statement.Category\x12$\n\x07\x63harset\x18\x02 \x01(\x0b\x32\x13.binlogdata.Charset\x12\x0b\n\x03sql\x18\x03 \x01(\x0c\"\xa9\x01\n\x08\x43\x61tegory\x12\x13\n\x0f\x42L_UNRECOGNIZED\x10\x00\x12\x0c\n\x08\x42L_BEGIN\x10\x01\x12\r\n\tBL_COMMIT\x10\x02\x12\x0f\n\x0b\x42L_ROLLBACK\x10\x03\x12\x15\n\x11\x42L_DML_DEPRECATED\x10\x04\x12\n\n\x06\x42L_DDL\x10\x05\x12\n\n\x06\x42L_SET\x10\x06\x12\r\n\tBL_INSERT\x10\x07\x12\r\n\tBL_UPDATE\x10\x08\x12\r\n\tBL_DELETE\x10\tJ\x04\x08\x02\x10\x03J\x04\x08\x03\x10\x04\"v\n\x15StreamKeyRangeRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12%\n\tkey_range\x18\x02 \x01(\x0b\x32\x12.topodata.KeyRange\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"S\n\x16StreamKeyRangeResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"]\n\x13StreamTablesRequest\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x0e\n\x06tables\x18\x02 \x03(\t\x12$\n\x07\x63harset\x18\x03 \x01(\x0b\x32\x13.binlogdata.Charset\"Q\n\x14StreamTablesResponse\x12\x39\n\x12\x62inlog_transaction\x18\x01 \x01(\x0b\x32\x1d.binlogdata.BinlogTransaction\"%\n\x04Rule\x12\r\n\x05match\x18\x01 \x01(\t\x12\x0e\n\x06\x66ilter\x18\x02 \x01(\t\")\n\x06\x46ilter\x12\x1f\n\x05rules\x18\x01 \x03(\x0b\x32\x10.binlogdata.Rule\"\xde\x01\n\x0c\x42inlogSource\x12\x10\n\x08keyspace\x18\x01 \x01(\t\x12\r\n\x05shard\x18\x02 \x01(\t\x12)\n\x0btablet_type\x18\x03 \x01(\x0e\x32\x14.topodata.TabletType\x12%\n\tkey_range\x18\x04 \x01(\x0b\x32\x12.topodata.KeyRange\x12\x0e\n\x06tables\x18\x05 \x03(\t\x12\"\n\x06\x66ilter\x18\x06 \x01(\x0b\x32\x12.binlogdata.Filter\x12\'\n\x06on_ddl\x18\x07 \x01(\x0e\x32\x17.binlogdata.OnDDLAction\"B\n\tRowChange\x12\x1a\n\x06\x62\x65\x66ore\x18\x01 \x01(\x0b\x32\n.query.Row\x12\x19\n\x05\x61\x66ter\x18\x02 \x01(\x0b\x32\n.query.Row\"J\n\x08RowEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12*\n\x0brow_changes\x18\x02 \x03(\x0b\x32\x15.binlogdata.RowChange\">\n\nFieldEvent\x12\x12\n\ntable_name\x18\x01 \x01(\t\x12\x1c\n\x06\x66ields\x18\x02 \x03(\x0b\x32\x0c.query.Field\"\xc8\x01\n\x06VEvent\x12$\n\x04type\x18\x01 \x01(\x0e\x32\x16.binlogdata.VEventType\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\x12\x0c\n\x04gtid\x18\x03 \x01(\t\x12\x0b\n\x03\x64\x64l\x18\x04 \x01(\t\x12\'\n\trow_event\x18\x05 \x01(\x0b\x32\x14.binlogdata.RowEvent\x12+\n\x0b\x66ield_event\x18\x06 \x01(\x0b\x32\x16.binlogdata.FieldEvent\x12\x14\n\x0c\x63urrent_time\x18\x14 \x01(\x03\"\xc7\x01\n\x0eVStreamRequest\x12,\n\x13\x65\x66\x66\x65\x63tive_caller_id\x18\x01 \x01(\x0b\x32\x0f.vtrpc.CallerID\x12\x32\n\x13immediate_caller_id\x18\x02 \x01(\x0b\x32\x15.query.VTGateCallerID\x12\x1d\n\x06target\x18\x03 \x01(\x0b\x32\r.query.Target\x12\x10\n\x08position\x18\x04 \x01(\t\x12\"\n\x06\x66ilter\x18\x05 \x01(\x0b\x32\x12.binlogdata.Filter\"5\n\x0fVStreamResponse\x12\"\n\x06\x65vents\x18\x01 \x03(\x0b\x32\x12.binlogdata.VEvent*>\n\x0bOnDDLAction\x12\n\n\x06IGNORE\x10\x00\x12\x08\n\x04STOP\x10\x01\x12\x08\n\x04\x45XEC\x10\x02\x12\x0f\n\x0b\x45XEC_IGNORE\x10\x03*\xb9\x01\n\nVEventType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x08\n\x04GTID\x10\x01\x12\t\n\x05\x42\x45GIN\x10\x02\x12\n\n\x06\x43OMMIT\x10\x03\x12\x0c\n\x08ROLLBACK\x10\x04\x12\x07\n\x03\x44\x44L\x10\x05\x12\n\n\x06INSERT\x10\x06\x12\x0b\n\x07REPLACE\x10\x07\x12\n\n\x06UPDATE\x10\x08\x12\n\n\x06\x44\x45LETE\x10\t\x12\x07\n\x03SET\x10\n\x12\t\n\x05OTHER\x10\x0b\x12\x07\n\x03ROW\x10\x0c\x12\t\n\x05\x46IELD\x10\r\x12\r\n\tHEARTBEAT\x10\x0e\x42)Z\'vitess.io/vitess/go/vt/proto/binlogdatab\x06proto3')
,
dependencies=[vtrpc__pb2.DESCRIPTOR,query__pb2.DESCRIPTOR,topodata__pb2.DESCRIPTOR,])
@ -52,8 +52,8 @@ _ONDDLACTION = _descriptor.EnumDescriptor(
],
containing_type=None,
serialized_options=None,
serialized_start=1907,
serialized_end=1969,
serialized_start=1929,
serialized_end=1991,
)
_sym_db.RegisterEnumDescriptor(_ONDDLACTION)
@ -120,11 +120,15 @@ _VEVENTTYPE = _descriptor.EnumDescriptor(
name='FIELD', index=13, number=13,
serialized_options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='HEARTBEAT', index=14, number=14,
serialized_options=None,
type=None),
],
containing_type=None,
serialized_options=None,
serialized_start=1972,
serialized_end=2142,
serialized_start=1994,
serialized_end=2179,
)
_sym_db.RegisterEnumDescriptor(_VEVENTTYPE)
@ -147,6 +151,7 @@ SET = 10
OTHER = 11
ROW = 12
FIELD = 13
HEARTBEAT = 14
_BINLOGTRANSACTION_STATEMENT_CATEGORY = _descriptor.EnumDescriptor(
@ -789,6 +794,13 @@ _VEVENT = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='current_time', full_name='binlogdata.VEvent.current_time', index=6,
number=20, type=3, cpp_type=2, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
@ -802,7 +814,7 @@ _VEVENT = _descriptor.Descriptor(
oneofs=[
],
serialized_start=1470,
serialized_end=1648,
serialized_end=1670,
)
@ -860,8 +872,8 @@ _VSTREAMREQUEST = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1651,
serialized_end=1850,
serialized_start=1673,
serialized_end=1872,
)
@ -891,8 +903,8 @@ _VSTREAMRESPONSE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=1852,
serialized_end=1905,
serialized_start=1874,
serialized_end=1927,
)
_BINLOGTRANSACTION_STATEMENT.fields_by_name['category'].enum_type = _BINLOGTRANSACTION_STATEMENT_CATEGORY