diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 804e5f414e..7078b5a9ea 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -599,7 +599,8 @@ func (m *StreamTablesResponse) GetBinlogTransaction() *BinlogTransaction { // Rule represents one rule. type Rule struct { // match can be a table name or a regular expression - // delineated by '/' and '/'. + // if it starts with a '/'. For example, "/.*" matches + // all tables. Match string `protobuf:"bytes,1,opt,name=match,proto3" json:"match,omitempty"` // filter can be an empty string or keyrange if the match // is a regular expression. Otherwise, it must be a select @@ -699,8 +700,8 @@ func (m *Filter) GetFieldEventMode() Filter_FieldEventMode { } // BinlogSource specifies the source and filter parameters for -// Filtered Replication. It currently supports a keyrange -// or a list of tables. +// Filtered Replication. KeyRange and Tables are legacy. Filter +// is the new way to specify the filtering rules. type BinlogSource struct { // the source keyspace Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` @@ -816,7 +817,10 @@ func (m *BinlogSource) GetStopAfterCopy() bool { return false } -// RowChange represents one row change +// RowChange represents one row change. +// If Before is set and not After, it's a delete. +// If After is set and not Before, it's an insert. +// If both are set, it's an update. type RowChange struct { Before *query.Row `protobuf:"bytes,1,opt,name=before,proto3" json:"before,omitempty"` After *query.Row `protobuf:"bytes,2,opt,name=after,proto3" json:"after,omitempty"` @@ -864,7 +868,7 @@ func (m *RowChange) GetAfter() *query.Row { return nil } -// RowEvent represent row events for one table +// RowEvent represent row events for one table. type RowEvent struct { TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"` RowChanges []*RowChange `protobuf:"bytes,2,rep,name=row_changes,json=rowChanges,proto3" json:"row_changes,omitempty"` @@ -912,6 +916,7 @@ func (m *RowEvent) GetRowChanges() []*RowChange { return nil } +// FieldEvent represents the field info for a table. type FieldEvent struct { TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"` Fields []*query.Field `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields,omitempty"` @@ -959,6 +964,11 @@ func (m *FieldEvent) GetFields() []*query.Field { return nil } +// ShardGtid contains the GTID position for one shard. +// It's used in a request for requesting a starting position. +// It's used in a response to transmit the current position +// of a shard. It's also used in a Journal to indicate the +// list of targets and shard positions to migrate to. type ShardGtid struct { Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"` @@ -1014,6 +1024,7 @@ func (m *ShardGtid) GetGtid() string { return "" } +// A VGtid is a list of ShardGtids. type VGtid struct { ShardGtids []*ShardGtid `protobuf:"bytes,1,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -1053,6 +1064,7 @@ func (m *VGtid) GetShardGtids() []*ShardGtid { return nil } +// KeyspaceShard represents a keyspace and shard. type KeyspaceShard struct { Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"` @@ -1100,17 +1112,25 @@ func (m *KeyspaceShard) GetShard() string { return "" } +// Journal contains the metadata for a journal event. type Journal struct { - Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"` - Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"` - LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"` - ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` - Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"` - SourceWorkflows []string `protobuf:"bytes,7,rep,name=source_workflows,json=sourceWorkflows,proto3" json:"source_workflows,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // Id represents a unique journal id. + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"` + // Tables is set if the journal represents a TABLES migration. + Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"` + // LocalPosition is the source position at which the migration happened. + LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"` + // ShardGtids is the list of targets to which the migration took place. + ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` + // Participants is the list of source participants for a migration. + Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"` + // SourceWorkflows is the list of workflows in the source shard that need + // to be migrated to the target. + SourceWorkflows []string `protobuf:"bytes,7,rep,name=source_workflows,json=sourceWorkflows,proto3" json:"source_workflows,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Journal) Reset() { *m = Journal{} } @@ -1187,18 +1207,32 @@ func (m *Journal) GetSourceWorkflows() []string { return nil } -// VEvent represents a vstream event +// VEvent represents a vstream event. +// A FieldEvent is sent once for every table, just before +// the first event for that table. The client is expected +// to cache this information and match it against the RowEvent +// which contains the table name. type VEvent struct { - Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` - Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"` - RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"` + Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` + // Timestamp is the binlog timestamp in seconds. + // It's set for all events. + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Gtid is set if the event type is GTID. + Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` + // Ddl is set if the event type is DDL. + Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"` + // RowEvent is set if the event type is ROW. + RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"` + // FieldEvent is set if the event type is FIELD. FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"` - Vgtid *VGtid `protobuf:"bytes,7,opt,name=vgtid,proto3" json:"vgtid,omitempty"` - Journal *Journal `protobuf:"bytes,8,opt,name=journal,proto3" json:"journal,omitempty"` - Dml string `protobuf:"bytes,9,opt,name=dml,proto3" json:"dml,omitempty"` - // current_time specifies the current time to handle clock skew. + // Vgtid is set if the event type is VGTID. + // This event is only generated by VTGate's VStream function. + Vgtid *VGtid `protobuf:"bytes,7,opt,name=vgtid,proto3" json:"vgtid,omitempty"` + // Journal is set if the event type is JOURNAL. + Journal *Journal `protobuf:"bytes,8,opt,name=journal,proto3" json:"journal,omitempty"` + // Dml is set if the event type is INSERT, REPLACE, UPDATE or DELETE. + Dml string `protobuf:"bytes,9,opt,name=dml,proto3" json:"dml,omitempty"` + // CurrentType specifies the current time to handle clock skew. CurrentTime int64 `protobuf:"varint,20,opt,name=current_time,json=currentTime,proto3" json:"current_time,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index cc482dc5e2..6c993a5a10 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -64,6 +64,7 @@ func NewVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, // TODO(sougou): find a better way for this. var vschemaUpdateCount sync2.AtomicInt64 +// vstreamer is for serving a single vreplication stream on the source side. type vstreamer struct { ctx context.Context cancel func() @@ -85,12 +86,25 @@ type vstreamer struct { } // streamerPlan extends the original plan to also include -// the TableMap which is used to extract values from the binlog events. +// the TableMap, which comes from the binlog. It's used +// to extract values from the ROW events. type streamerPlan struct { *Plan TableMap *mysql.TableMap } +// newVStreamer creates a new vstreamer. +// cp: the mysql conn params. +// se: the schema engine. The vstreamer uses it to convert the TableMap into field info. +// startPos: a flavor compliant position to stream from. This can also contain the special +// value "current", which means start from the current position. +// filter: the list of filtering rules. If a rule has a select expressinon for its filter, +// the select list can only reference direct columns. No other experssions are allowed. +// The select expression is allowed to contain the special 'keyspace_id()' function which +// will return the keyspace id of the row. For more info, see the documentation +// for binlogdatapb.Filter. +// vschema: the current vschema. This value can later be changed through the SetVSchema method. +// send: callback function to send events. func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ @@ -107,7 +121,7 @@ func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, } } -// SetVSchema updates all existing streams against the new vschema. +// SetVSchema updates the vstreamer against the new vschema. func (vs *vstreamer) SetVSchema(vschema *localVSchema) { // Since vs.Stream is a single-threaded loop. We just send an event to // that thread, which helps us avoid mutexes to update the plans. @@ -117,14 +131,16 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) { } } +// Cancel stops the streaming. func (vs *vstreamer) Cancel() { vs.cancel() } -// Stream runs a single-threaded loop. +// Stream streams binlog events. func (vs *vstreamer) Stream() error { defer vs.cancel() + // Validate the request against the current position. curPos, err := vs.currentPosition() if err != nil { return vterrors.Wrap(err, "could not obtain current position") @@ -171,13 +187,22 @@ func (vs *vstreamer) currentPosition() (mysql.Position, error) { return conn.MasterPosition() } +// parseEvents parses and sends events. func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.BinlogEvent) error { // bufferAndTransmit uses bufferedEvents and curSize to buffer events. var ( bufferedEvents []*binlogdatapb.VEvent curSize int ) - // Buffering only takes row lengths into consideration. + // Only the following patterns are possible: + // BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks. + // BEGIN->JOURNAL->GTID->COMMIT + // GTID->DDL + // GTID->OTHER + // HEARTBEAT is issued if there's inactivity, which is likely + // to heppend between one group of events and another. + // + // Buffering only takes row or statement lengths into consideration. // Length of other events is considered negligible. // If a new row event causes the packet size to be exceeded, // all existing rows are sent without the new row. @@ -186,9 +211,14 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog switch vevent.Type { case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, binlogdatapb.VEventType_JOURNAL: // We never have to send GTID, BEGIN, FIELD events on their own. + // A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT. + // So, we don't have to send it right away. bufferedEvents = append(bufferedEvents, vevent) case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT: // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. + // Although unlikely, it's possible to get a HEARTBEAT in the middle + // of a transaction. If so, we still send the partial transaction along + // with the heartbeat. bufferedEvents = append(bufferedEvents, vevent) vevents := bufferedEvents bufferedEvents = nil @@ -287,8 +317,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } } +// parseEvent parses an event from the binlog and converts it to a list of VEvents. func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { - // Validate the buffer before reading fields from it. if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -428,6 +458,11 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } case ev.IsTableMap(): // This is very frequent. It precedes every row event. + // If it's the first time for a table, we generate a FIELD + // event, and also cache the plan. Subsequent TableMap events + // for that table id don't generate VEvents. + // A schema change will result in a change in table id, which + // will generate a new plan and FIELD event. id := ev.TableID(vs.format) if _, ok := vs.plans[id]; ok { return nil, nil @@ -437,6 +472,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e return nil, err } if tm.Database == "_vt" && tm.Name == "resharding_journal" { + // A journal is a special case that generates a JOURNAL event. return nil, vs.buildJournalPlan(id, tm) } if tm.Database != "" && tm.Database != vs.cp.DbName { diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 110ba8902c..95ca6aaef6 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -117,23 +117,37 @@ message StreamTablesResponse { // Rule represents one rule. message Rule { - // match can be a table name or a regular expression - // delineated by '/' and '/'. + // Match can be a table name or a regular expression. + // If it starts with a '/', it's a regular expression. + // For example, "t" matches a table named "t", whereas + // "/t.*" matches all tables that begin with 't'. string match = 1; - // filter can be an empty string or keyrange if the match - // is a regular expression. Otherwise, it must be a select - // query. + // Filter: If empty, all columns and rows of the matching tables + // are sent. If it's a keyrange, only rows that match the + // keyrange are sent. + // If Match is a table name instead of a regular expression, + // the Filter can also be a select expression. + // What is allowed in a select expression depends on whether + // it's a vstreamer or vreplication request. string filter = 2; } -// Filter represents a list of ordered rules. First match -// wins. +// Filter represents a list of ordered rules. The first +// match wins. message Filter { repeated Rule rules = 1; enum FieldEventMode { ERR_ON_MISMATCH = 0; BEST_EFFORT = 1; } + // FieldEventMode specifies the behavior if there is a mismatch + // between the current schema and the fields in the binlog. This + // can happen if the binlog position is before a DDL that would + // cause the fields to change. If vstreamer detects such + // an inconsistency, the behavior depends on the FieldEventMode. + // If the value is ERR_ON_MISMATCH (default), then it errors out. + // If it's BEST_EFFORT, it sends a field event with fake column + // names as "@1", "@2", etc. FieldEventMode fieldEventMode = 2; } @@ -146,8 +160,8 @@ enum OnDDLAction { } // BinlogSource specifies the source and filter parameters for -// Filtered Replication. It currently supports a keyrange -// or a list of tables. +// Filtered Replication. KeyRange and Tables are legacy. Filter +// is the new way to specify the filtering rules. message BinlogSource { // the source keyspace string keyspace = 1; @@ -158,30 +172,29 @@ message BinlogSource { // the source tablet type topodata.TabletType tablet_type = 3; - // key_range is set if the request is for a keyrange + // KeyRange is set if the request is for a keyrange topodata.KeyRange key_range = 4; - // tables is set if the request is for a list of tables + // Tables is set if the request is for a list of tables repeated string tables = 5; - // filter is set if we're using the generalized representation + // Filter is set if we're using the generalized representation // for the filter. Filter filter = 6; - // on_ddl specifies the action to be taken when a DDL is encountered. + // OnDdl specifies the action to be taken when a DDL is encountered. OnDDLAction on_ddl = 7; // Source is an external mysql. This attribute should be set to the username // to use in the connection string external_mysql = 8; - // stop_after_copy specifies if vreplication should be stopped + // StopAfterCopy specifies if vreplication should be stopped // after copying is done. bool stop_after_copy = 9; } -// VEventType enumerates the event types. -// This list is comprehensive. Many of these types +// VEventType enumerates the event types. Many of these types // will not be encountered in RBR mode. enum VEventType { UNKNOWN = 0; @@ -190,46 +203,65 @@ enum VEventType { COMMIT = 3; ROLLBACK = 4; DDL = 5; + // INSERT, REPLACE, UPDATE, DELETE and SET will not be seen in RBR mode. INSERT = 6; REPLACE = 7; UPDATE = 8; DELETE = 9; SET = 10; + // OTHER is a dummy event. If encountered, the current GTID must be + // recorded by the client to be able to resume. OTHER = 11; ROW = 12; FIELD = 13; + // HEARTBEAT is sent if there is inactivity. If a client does not + // receive events beyond the hearbeat interval, it can assume that it's + // lost connection to the vstreamer. HEARTBEAT = 14; + // VGTID is generated by VTGate's VStream that combines multiple + // GTIDs. VGTID = 15; JOURNAL = 16; } -// RowChange represents one row change +// RowChange represents one row change. +// If Before is set and not After, it's a delete. +// If After is set and not Before, it's an insert. +// If both are set, it's an update. message RowChange { query.Row before = 1; query.Row after = 2; } -// RowEvent represent row events for one table +// RowEvent represent row events for one table. message RowEvent { string table_name = 1; repeated RowChange row_changes = 2; } +// FieldEvent represents the field info for a table. message FieldEvent { string table_name = 1; repeated query.Field fields = 2; } +// ShardGtid contains the GTID position for one shard. +// It's used in a request for requesting a starting position. +// It's used in a response to transmit the current position +// of a shard. It's also used in a Journal to indicate the +// list of targets and shard positions to migrate to. message ShardGtid { string keyspace = 1; string shard = 2; string gtid = 3; } +// A VGtid is a list of ShardGtids. message VGtid { repeated ShardGtid shard_gtids = 1; } +// KeyspaceShard represents a keyspace and shard. message KeyspaceShard { string keyspace = 1; string shard = 2; @@ -241,28 +273,63 @@ enum MigrationType { SHARDS = 1; } +// Journal contains the metadata for a journal event. +// The commit of a journal event indicates the point of no return +// for a migration. message Journal { + // Id represents a unique journal id. int64 id = 1; MigrationType migration_type = 2; + // Tables is set if the journal represents a TABLES migration. repeated string tables = 3; + // LocalPosition is the source position at which the migration happened. string local_position = 4; + // ShardGtids is the list of targets to which the migration took place. repeated ShardGtid shard_gtids = 5; + // Participants is the list of source participants for a migration. + // Every participant is expected to have an identical journal entry. + // While streaming, the client must wait for the journal entry to + // be received from all pariticipants, and then replace them with new + // streams specified by ShardGtid. + // If a stream does not have all participants, a consistent migration + // is not possible. repeated KeyspaceShard participants = 6; + // SourceWorkflows is the list of workflows in the source shard that + // were migrated to the target. If a migration fails after a Journal + // is committed, this information is used to start the target streams + // that were created prior to the creation of the journal. repeated string source_workflows = 7; } -// VEvent represents a vstream event +// VEvent represents a vstream event. +// A FieldEvent is sent once for every table, just before +// the first event for that table. The client is expected +// to cache this information and match it against the RowEvent +// which contains the table name. +// A GTID event always precedes a commitable event, which can be +// COMMIT, DDL or OTHER. +// OTHER events are non-material events that have no metadata. message VEvent { VEventType type = 1; + // Timestamp is the binlog timestamp in seconds. + // It's set for all events. int64 timestamp = 2; + // Gtid is set if the event type is GTID. string gtid = 3; + // Ddl is set if the event type is DDL. string ddl = 4; + // RowEvent is set if the event type is ROW. RowEvent row_event = 5; + // FieldEvent is set if the event type is FIELD. FieldEvent field_event = 6; + // Vgtid is set if the event type is VGTID. + // This event is only generated by VTGate's VStream function. VGtid vgtid = 7; + // Journal is set if the event type is JOURNAL. Journal journal = 8; + // Dml is set if the event type is INSERT, REPLACE, UPDATE or DELETE. string dml = 9; - // current_time specifies the current time to handle clock skew. + // CurrentType specifies the current time to handle clock skew. int64 current_time = 20; }