vrepl: document vstreamer.go, binlogdata.proto

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-12-30 21:11:57 -08:00
Родитель 68a701ced7
Коммит 997835076a
3 изменённых файлов: 187 добавлений и 50 удалений

Просмотреть файл

@ -599,7 +599,8 @@ func (m *StreamTablesResponse) GetBinlogTransaction() *BinlogTransaction {
// Rule represents one rule.
type Rule struct {
// match can be a table name or a regular expression
// delineated by '/' and '/'.
// if it starts with a '/'. For example, "/.*" matches
// all tables.
Match string `protobuf:"bytes,1,opt,name=match,proto3" json:"match,omitempty"`
// filter can be an empty string or keyrange if the match
// is a regular expression. Otherwise, it must be a select
@ -699,8 +700,8 @@ func (m *Filter) GetFieldEventMode() Filter_FieldEventMode {
}
// BinlogSource specifies the source and filter parameters for
// Filtered Replication. It currently supports a keyrange
// or a list of tables.
// Filtered Replication. KeyRange and Tables are legacy. Filter
// is the new way to specify the filtering rules.
type BinlogSource struct {
// the source keyspace
Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"`
@ -816,7 +817,10 @@ func (m *BinlogSource) GetStopAfterCopy() bool {
return false
}
// RowChange represents one row change
// RowChange represents one row change.
// If Before is set and not After, it's a delete.
// If After is set and not Before, it's an insert.
// If both are set, it's an update.
type RowChange struct {
Before *query.Row `protobuf:"bytes,1,opt,name=before,proto3" json:"before,omitempty"`
After *query.Row `protobuf:"bytes,2,opt,name=after,proto3" json:"after,omitempty"`
@ -864,7 +868,7 @@ func (m *RowChange) GetAfter() *query.Row {
return nil
}
// RowEvent represent row events for one table
// RowEvent represent row events for one table.
type RowEvent struct {
TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"`
RowChanges []*RowChange `protobuf:"bytes,2,rep,name=row_changes,json=rowChanges,proto3" json:"row_changes,omitempty"`
@ -912,6 +916,7 @@ func (m *RowEvent) GetRowChanges() []*RowChange {
return nil
}
// FieldEvent represents the field info for a table.
type FieldEvent struct {
TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"`
Fields []*query.Field `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields,omitempty"`
@ -959,6 +964,11 @@ func (m *FieldEvent) GetFields() []*query.Field {
return nil
}
// ShardGtid contains the GTID position for one shard.
// It's used in a request for requesting a starting position.
// It's used in a response to transmit the current position
// of a shard. It's also used in a Journal to indicate the
// list of targets and shard positions to migrate to.
type ShardGtid struct {
Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"`
Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"`
@ -1014,6 +1024,7 @@ func (m *ShardGtid) GetGtid() string {
return ""
}
// A VGtid is a list of ShardGtids.
type VGtid struct {
ShardGtids []*ShardGtid `protobuf:"bytes,1,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
@ -1053,6 +1064,7 @@ func (m *VGtid) GetShardGtids() []*ShardGtid {
return nil
}
// KeyspaceShard represents a keyspace and shard.
type KeyspaceShard struct {
Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"`
Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"`
@ -1100,17 +1112,25 @@ func (m *KeyspaceShard) GetShard() string {
return ""
}
// Journal contains the metadata for a journal event.
type Journal struct {
Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"`
Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"`
LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"`
ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"`
Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"`
SourceWorkflows []string `protobuf:"bytes,7,rep,name=source_workflows,json=sourceWorkflows,proto3" json:"source_workflows,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
// Id represents a unique journal id.
Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"`
// Tables is set if the journal represents a TABLES migration.
Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"`
// LocalPosition is the source position at which the migration happened.
LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"`
// ShardGtids is the list of targets to which the migration took place.
ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"`
// Participants is the list of source participants for a migration.
Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"`
// SourceWorkflows is the list of workflows in the source shard that need
// to be migrated to the target.
SourceWorkflows []string `protobuf:"bytes,7,rep,name=source_workflows,json=sourceWorkflows,proto3" json:"source_workflows,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Journal) Reset() { *m = Journal{} }
@ -1187,18 +1207,32 @@ func (m *Journal) GetSourceWorkflows() []string {
return nil
}
// VEvent represents a vstream event
// VEvent represents a vstream event.
// A FieldEvent is sent once for every table, just before
// the first event for that table. The client is expected
// to cache this information and match it against the RowEvent
// which contains the table name.
type VEvent struct {
Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"`
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"`
Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"`
RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"`
Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"`
// Timestamp is the binlog timestamp in seconds.
// It's set for all events.
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Gtid is set if the event type is GTID.
Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"`
// Ddl is set if the event type is DDL.
Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"`
// RowEvent is set if the event type is ROW.
RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"`
// FieldEvent is set if the event type is FIELD.
FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"`
Vgtid *VGtid `protobuf:"bytes,7,opt,name=vgtid,proto3" json:"vgtid,omitempty"`
Journal *Journal `protobuf:"bytes,8,opt,name=journal,proto3" json:"journal,omitempty"`
Dml string `protobuf:"bytes,9,opt,name=dml,proto3" json:"dml,omitempty"`
// current_time specifies the current time to handle clock skew.
// Vgtid is set if the event type is VGTID.
// This event is only generated by VTGate's VStream function.
Vgtid *VGtid `protobuf:"bytes,7,opt,name=vgtid,proto3" json:"vgtid,omitempty"`
// Journal is set if the event type is JOURNAL.
Journal *Journal `protobuf:"bytes,8,opt,name=journal,proto3" json:"journal,omitempty"`
// Dml is set if the event type is INSERT, REPLACE, UPDATE or DELETE.
Dml string `protobuf:"bytes,9,opt,name=dml,proto3" json:"dml,omitempty"`
// CurrentType specifies the current time to handle clock skew.
CurrentTime int64 `protobuf:"varint,20,opt,name=current_time,json=currentTime,proto3" json:"current_time,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`

Просмотреть файл

@ -64,6 +64,7 @@ func NewVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine,
// TODO(sougou): find a better way for this.
var vschemaUpdateCount sync2.AtomicInt64
// vstreamer is for serving a single vreplication stream on the source side.
type vstreamer struct {
ctx context.Context
cancel func()
@ -85,12 +86,25 @@ type vstreamer struct {
}
// streamerPlan extends the original plan to also include
// the TableMap which is used to extract values from the binlog events.
// the TableMap, which comes from the binlog. It's used
// to extract values from the ROW events.
type streamerPlan struct {
*Plan
TableMap *mysql.TableMap
}
// newVStreamer creates a new vstreamer.
// cp: the mysql conn params.
// se: the schema engine. The vstreamer uses it to convert the TableMap into field info.
// startPos: a flavor compliant position to stream from. This can also contain the special
// value "current", which means start from the current position.
// filter: the list of filtering rules. If a rule has a select expressinon for its filter,
// the select list can only reference direct columns. No other experssions are allowed.
// The select expression is allowed to contain the special 'keyspace_id()' function which
// will return the keyspace id of the row. For more info, see the documentation
// for binlogdatapb.Filter.
// vschema: the current vschema. This value can later be changed through the SetVSchema method.
// send: callback function to send events.
func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer {
ctx, cancel := context.WithCancel(ctx)
return &vstreamer{
@ -107,7 +121,7 @@ func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine,
}
}
// SetVSchema updates all existing streams against the new vschema.
// SetVSchema updates the vstreamer against the new vschema.
func (vs *vstreamer) SetVSchema(vschema *localVSchema) {
// Since vs.Stream is a single-threaded loop. We just send an event to
// that thread, which helps us avoid mutexes to update the plans.
@ -117,14 +131,16 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) {
}
}
// Cancel stops the streaming.
func (vs *vstreamer) Cancel() {
vs.cancel()
}
// Stream runs a single-threaded loop.
// Stream streams binlog events.
func (vs *vstreamer) Stream() error {
defer vs.cancel()
// Validate the request against the current position.
curPos, err := vs.currentPosition()
if err != nil {
return vterrors.Wrap(err, "could not obtain current position")
@ -171,13 +187,22 @@ func (vs *vstreamer) currentPosition() (mysql.Position, error) {
return conn.MasterPosition()
}
// parseEvents parses and sends events.
func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.BinlogEvent) error {
// bufferAndTransmit uses bufferedEvents and curSize to buffer events.
var (
bufferedEvents []*binlogdatapb.VEvent
curSize int
)
// Buffering only takes row lengths into consideration.
// Only the following patterns are possible:
// BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks.
// BEGIN->JOURNAL->GTID->COMMIT
// GTID->DDL
// GTID->OTHER
// HEARTBEAT is issued if there's inactivity, which is likely
// to heppend between one group of events and another.
//
// Buffering only takes row or statement lengths into consideration.
// Length of other events is considered negligible.
// If a new row event causes the packet size to be exceeded,
// all existing rows are sent without the new row.
@ -186,9 +211,14 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
switch vevent.Type {
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, binlogdatapb.VEventType_JOURNAL:
// We never have to send GTID, BEGIN, FIELD events on their own.
// A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT.
// So, we don't have to send it right away.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT:
// COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent.
// Although unlikely, it's possible to get a HEARTBEAT in the middle
// of a transaction. If so, we still send the partial transaction along
// with the heartbeat.
bufferedEvents = append(bufferedEvents, vevent)
vevents := bufferedEvents
bufferedEvents = nil
@ -287,8 +317,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
}
// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {
// Validate the buffer before reading fields from it.
if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
@ -428,6 +458,11 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
case ev.IsTableMap():
// This is very frequent. It precedes every row event.
// If it's the first time for a table, we generate a FIELD
// event, and also cache the plan. Subsequent TableMap events
// for that table id don't generate VEvents.
// A schema change will result in a change in table id, which
// will generate a new plan and FIELD event.
id := ev.TableID(vs.format)
if _, ok := vs.plans[id]; ok {
return nil, nil
@ -437,6 +472,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
return nil, err
}
if tm.Database == "_vt" && tm.Name == "resharding_journal" {
// A journal is a special case that generates a JOURNAL event.
return nil, vs.buildJournalPlan(id, tm)
}
if tm.Database != "" && tm.Database != vs.cp.DbName {

Просмотреть файл

@ -117,23 +117,37 @@ message StreamTablesResponse {
// Rule represents one rule.
message Rule {
// match can be a table name or a regular expression
// delineated by '/' and '/'.
// Match can be a table name or a regular expression.
// If it starts with a '/', it's a regular expression.
// For example, "t" matches a table named "t", whereas
// "/t.*" matches all tables that begin with 't'.
string match = 1;
// filter can be an empty string or keyrange if the match
// is a regular expression. Otherwise, it must be a select
// query.
// Filter: If empty, all columns and rows of the matching tables
// are sent. If it's a keyrange, only rows that match the
// keyrange are sent.
// If Match is a table name instead of a regular expression,
// the Filter can also be a select expression.
// What is allowed in a select expression depends on whether
// it's a vstreamer or vreplication request.
string filter = 2;
}
// Filter represents a list of ordered rules. First match
// wins.
// Filter represents a list of ordered rules. The first
// match wins.
message Filter {
repeated Rule rules = 1;
enum FieldEventMode {
ERR_ON_MISMATCH = 0;
BEST_EFFORT = 1;
}
// FieldEventMode specifies the behavior if there is a mismatch
// between the current schema and the fields in the binlog. This
// can happen if the binlog position is before a DDL that would
// cause the fields to change. If vstreamer detects such
// an inconsistency, the behavior depends on the FieldEventMode.
// If the value is ERR_ON_MISMATCH (default), then it errors out.
// If it's BEST_EFFORT, it sends a field event with fake column
// names as "@1", "@2", etc.
FieldEventMode fieldEventMode = 2;
}
@ -146,8 +160,8 @@ enum OnDDLAction {
}
// BinlogSource specifies the source and filter parameters for
// Filtered Replication. It currently supports a keyrange
// or a list of tables.
// Filtered Replication. KeyRange and Tables are legacy. Filter
// is the new way to specify the filtering rules.
message BinlogSource {
// the source keyspace
string keyspace = 1;
@ -158,30 +172,29 @@ message BinlogSource {
// the source tablet type
topodata.TabletType tablet_type = 3;
// key_range is set if the request is for a keyrange
// KeyRange is set if the request is for a keyrange
topodata.KeyRange key_range = 4;
// tables is set if the request is for a list of tables
// Tables is set if the request is for a list of tables
repeated string tables = 5;
// filter is set if we're using the generalized representation
// Filter is set if we're using the generalized representation
// for the filter.
Filter filter = 6;
// on_ddl specifies the action to be taken when a DDL is encountered.
// OnDdl specifies the action to be taken when a DDL is encountered.
OnDDLAction on_ddl = 7;
// Source is an external mysql. This attribute should be set to the username
// to use in the connection
string external_mysql = 8;
// stop_after_copy specifies if vreplication should be stopped
// StopAfterCopy specifies if vreplication should be stopped
// after copying is done.
bool stop_after_copy = 9;
}
// VEventType enumerates the event types.
// This list is comprehensive. Many of these types
// VEventType enumerates the event types. Many of these types
// will not be encountered in RBR mode.
enum VEventType {
UNKNOWN = 0;
@ -190,46 +203,65 @@ enum VEventType {
COMMIT = 3;
ROLLBACK = 4;
DDL = 5;
// INSERT, REPLACE, UPDATE, DELETE and SET will not be seen in RBR mode.
INSERT = 6;
REPLACE = 7;
UPDATE = 8;
DELETE = 9;
SET = 10;
// OTHER is a dummy event. If encountered, the current GTID must be
// recorded by the client to be able to resume.
OTHER = 11;
ROW = 12;
FIELD = 13;
// HEARTBEAT is sent if there is inactivity. If a client does not
// receive events beyond the hearbeat interval, it can assume that it's
// lost connection to the vstreamer.
HEARTBEAT = 14;
// VGTID is generated by VTGate's VStream that combines multiple
// GTIDs.
VGTID = 15;
JOURNAL = 16;
}
// RowChange represents one row change
// RowChange represents one row change.
// If Before is set and not After, it's a delete.
// If After is set and not Before, it's an insert.
// If both are set, it's an update.
message RowChange {
query.Row before = 1;
query.Row after = 2;
}
// RowEvent represent row events for one table
// RowEvent represent row events for one table.
message RowEvent {
string table_name = 1;
repeated RowChange row_changes = 2;
}
// FieldEvent represents the field info for a table.
message FieldEvent {
string table_name = 1;
repeated query.Field fields = 2;
}
// ShardGtid contains the GTID position for one shard.
// It's used in a request for requesting a starting position.
// It's used in a response to transmit the current position
// of a shard. It's also used in a Journal to indicate the
// list of targets and shard positions to migrate to.
message ShardGtid {
string keyspace = 1;
string shard = 2;
string gtid = 3;
}
// A VGtid is a list of ShardGtids.
message VGtid {
repeated ShardGtid shard_gtids = 1;
}
// KeyspaceShard represents a keyspace and shard.
message KeyspaceShard {
string keyspace = 1;
string shard = 2;
@ -241,28 +273,63 @@ enum MigrationType {
SHARDS = 1;
}
// Journal contains the metadata for a journal event.
// The commit of a journal event indicates the point of no return
// for a migration.
message Journal {
// Id represents a unique journal id.
int64 id = 1;
MigrationType migration_type = 2;
// Tables is set if the journal represents a TABLES migration.
repeated string tables = 3;
// LocalPosition is the source position at which the migration happened.
string local_position = 4;
// ShardGtids is the list of targets to which the migration took place.
repeated ShardGtid shard_gtids = 5;
// Participants is the list of source participants for a migration.
// Every participant is expected to have an identical journal entry.
// While streaming, the client must wait for the journal entry to
// be received from all pariticipants, and then replace them with new
// streams specified by ShardGtid.
// If a stream does not have all participants, a consistent migration
// is not possible.
repeated KeyspaceShard participants = 6;
// SourceWorkflows is the list of workflows in the source shard that
// were migrated to the target. If a migration fails after a Journal
// is committed, this information is used to start the target streams
// that were created prior to the creation of the journal.
repeated string source_workflows = 7;
}
// VEvent represents a vstream event
// VEvent represents a vstream event.
// A FieldEvent is sent once for every table, just before
// the first event for that table. The client is expected
// to cache this information and match it against the RowEvent
// which contains the table name.
// A GTID event always precedes a commitable event, which can be
// COMMIT, DDL or OTHER.
// OTHER events are non-material events that have no metadata.
message VEvent {
VEventType type = 1;
// Timestamp is the binlog timestamp in seconds.
// It's set for all events.
int64 timestamp = 2;
// Gtid is set if the event type is GTID.
string gtid = 3;
// Ddl is set if the event type is DDL.
string ddl = 4;
// RowEvent is set if the event type is ROW.
RowEvent row_event = 5;
// FieldEvent is set if the event type is FIELD.
FieldEvent field_event = 6;
// Vgtid is set if the event type is VGTID.
// This event is only generated by VTGate's VStream function.
VGtid vgtid = 7;
// Journal is set if the event type is JOURNAL.
Journal journal = 8;
// Dml is set if the event type is INSERT, REPLACE, UPDATE or DELETE.
string dml = 9;
// current_time specifies the current time to handle clock skew.
// CurrentType specifies the current time to handle clock skew.
int64 current_time = 20;
}