зеркало из https://github.com/github/vitess-gh.git
Merge pull request #2124 from alainjobart/mysql56
Fixing test/cache_invalidation.py for MySQL 5.6
This commit is contained in:
Коммит
459080ada1
|
@ -39,8 +39,8 @@ unsigned long cli_safe_read(MYSQL *mysql);
|
|||
|
||||
// st_mysql_methods and simple_command are declared in mysql.h in
|
||||
// Google MySQL 5.1 (VERSION_ID=501xx), but were moved to sql_common.h in
|
||||
// MariaDB (VERSION_ID=1000xx) and MySQL 5.6 (VERSION_ID=506xx).
|
||||
#if MYSQL_VERSION_ID >= 50600 // MySQL version >= 5.6
|
||||
// MariaDB (VERSION_ID=1000xx) and MySQL 5.5 (VERSION_ID=505xx).
|
||||
#if MYSQL_VERSION_ID >= 50500 // MySQL version >= 5.5
|
||||
|
||||
typedef struct st_mysql_methods
|
||||
{
|
||||
|
|
|
@ -131,7 +131,7 @@ func (bls *Streamer) Stream(ctx context.Context) (err error) {
|
|||
|
||||
var events <-chan replication.BinlogEvent
|
||||
if bls.timestamp != 0 {
|
||||
events, err = bls.conn.StartBinlogDumpFromTimestamp(ctx, bls.timestamp)
|
||||
events, err = bls.conn.StartBinlogDumpFromBinlogBeforeTimestamp(ctx, bls.timestamp)
|
||||
} else if !bls.startPos.IsZero() {
|
||||
events, err = bls.conn.StartBinlogDumpFromPosition(ctx, bls.startPos)
|
||||
} else {
|
||||
|
@ -174,18 +174,20 @@ func (bls *Streamer) parseEvents(ctx context.Context, events <-chan replication.
|
|||
// A commit can be triggered either by a COMMIT query, or by an XID_EVENT.
|
||||
// Statements that aren't wrapped in BEGIN/COMMIT are committed immediately.
|
||||
commit := func(timestamp uint32) error {
|
||||
trans := &binlogdatapb.BinlogTransaction{
|
||||
Statements: statements,
|
||||
EventToken: &querypb.EventToken{
|
||||
Timestamp: int64(timestamp),
|
||||
Position: replication.EncodePosition(pos),
|
||||
},
|
||||
}
|
||||
if err = bls.sendTransaction(trans); err != nil {
|
||||
if err == io.EOF {
|
||||
return ErrClientEOF
|
||||
if int64(timestamp) >= bls.timestamp {
|
||||
trans := &binlogdatapb.BinlogTransaction{
|
||||
Statements: statements,
|
||||
EventToken: &querypb.EventToken{
|
||||
Timestamp: int64(timestamp),
|
||||
Position: replication.EncodePosition(pos),
|
||||
},
|
||||
}
|
||||
if err = bls.sendTransaction(trans); err != nil {
|
||||
if err == io.EOF {
|
||||
return ErrClientEOF
|
||||
}
|
||||
return fmt.Errorf("send reply error: %v", err)
|
||||
}
|
||||
return fmt.Errorf("send reply error: %v", err)
|
||||
}
|
||||
statements = nil
|
||||
autocommit = true
|
||||
|
@ -328,6 +330,22 @@ func (bls *Streamer) parseEvents(ctx context.Context, events <-chan replication.
|
|||
}
|
||||
}
|
||||
}
|
||||
case ev.IsPreviousGTIDs(): // PREVIOUS_GTIDS_EVENT
|
||||
// MySQL 5.6 only: The Binlogs contain an
|
||||
// event that gives us all the previously
|
||||
// applied commits. It is an authoritative
|
||||
// value, so we use that now. When we start
|
||||
// streaming from the beginning of a binlog
|
||||
// file (when starting with a timestamp), this
|
||||
// will give us the full position.
|
||||
newPos, err := ev.PreviousGTIDs(format)
|
||||
if err != nil {
|
||||
return pos, err
|
||||
}
|
||||
pos = newPos
|
||||
if err = commit(ev.Timestamp()); err != nil {
|
||||
return pos, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ func (fakeEvent) IsGTID() bool { return false }
|
|||
func (fakeEvent) IsRotate() bool { return false }
|
||||
func (fakeEvent) IsIntVar() bool { return false }
|
||||
func (fakeEvent) IsRand() bool { return false }
|
||||
func (fakeEvent) IsPreviousGTIDs() bool { return false }
|
||||
func (fakeEvent) HasGTID(replication.BinlogFormat) bool { return true }
|
||||
func (fakeEvent) Timestamp() uint32 { return 1407805592 }
|
||||
func (fakeEvent) Format() (replication.BinlogFormat, error) {
|
||||
|
@ -41,6 +42,9 @@ func (fakeEvent) Format() (replication.BinlogFormat, error) {
|
|||
func (fakeEvent) GTID(replication.BinlogFormat) (replication.GTID, error) {
|
||||
return replication.MariadbGTID{Domain: 0, Server: 62344, Sequence: 0xd}, nil
|
||||
}
|
||||
func (fakeEvent) PreviousGTIDs(replication.BinlogFormat) (replication.Position, error) {
|
||||
return replication.Position{}, errors.New("not a PreviousGTIDs")
|
||||
}
|
||||
func (fakeEvent) IsBeginGTID(replication.BinlogFormat) bool { return false }
|
||||
func (fakeEvent) Query(replication.BinlogFormat) (replication.Query, error) {
|
||||
return replication.Query{}, errors.New("not a query")
|
||||
|
|
|
@ -119,6 +119,11 @@ func (ev binlogEvent) IsRand() bool {
|
|||
return ev.Type() == 13
|
||||
}
|
||||
|
||||
// IsPreviousGTIDs implements BinlogEvent.IsPreviousGTIDs().
|
||||
func (ev binlogEvent) IsPreviousGTIDs() bool {
|
||||
return ev.Type() == 35
|
||||
}
|
||||
|
||||
// Format implements BinlogEvent.Format().
|
||||
//
|
||||
// Expected format (L = total length of event data):
|
||||
|
|
|
@ -251,6 +251,11 @@ func (ev mariadbBinlogEvent) GTID(f replication.BinlogFormat) (replication.GTID,
|
|||
}, nil
|
||||
}
|
||||
|
||||
// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
|
||||
func (ev mariadbBinlogEvent) PreviousGTIDs(f replication.BinlogFormat) (replication.Position, error) {
|
||||
return replication.Position{}, fmt.Errorf("MariaDB should not provide PREVIOUS_GTIDS_EVENT events")
|
||||
}
|
||||
|
||||
// StripChecksum implements BinlogEvent.StripChecksum().
|
||||
func (ev mariadbBinlogEvent) StripChecksum(f replication.BinlogFormat) (replication.BinlogEvent, []byte, error) {
|
||||
switch f.ChecksumAlgorithm {
|
||||
|
|
|
@ -211,6 +211,18 @@ func (ev mysql56BinlogEvent) GTID(f replication.BinlogFormat) (replication.GTID,
|
|||
return replication.Mysql56GTID{Server: sid, Sequence: gno}, nil
|
||||
}
|
||||
|
||||
// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
|
||||
func (ev mysql56BinlogEvent) PreviousGTIDs(f replication.BinlogFormat) (replication.Position, error) {
|
||||
data := ev.Bytes()[f.HeaderLength:]
|
||||
set, err := replication.NewMysql56GTIDSetFromSIDBlock(data)
|
||||
if err != nil {
|
||||
return replication.Position{}, err
|
||||
}
|
||||
return replication.Position{
|
||||
GTIDSet: set,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// StripChecksum implements BinlogEvent.StripChecksum().
|
||||
func (ev mysql56BinlogEvent) StripChecksum(f replication.BinlogFormat) (replication.BinlogEvent, []byte, error) {
|
||||
switch f.ChecksumAlgorithm {
|
||||
|
|
|
@ -54,6 +54,8 @@ type BinlogEvent interface {
|
|||
// because it's a GTID_EVENT (MariaDB, MySQL 5.6), or because it is some
|
||||
// arbitrary event type that has a GTID in the header (Google MySQL).
|
||||
HasGTID(BinlogFormat) bool
|
||||
// IsPreviousGTIDs returns true if this event is a PREVIOUS_GTIDS_EVENT.
|
||||
IsPreviousGTIDs() bool
|
||||
|
||||
// Timestamp returns the timestamp from the event header.
|
||||
Timestamp() uint32
|
||||
|
@ -78,6 +80,9 @@ type BinlogEvent interface {
|
|||
// Rand returns the two seed values for a RAND_EVENT.
|
||||
// This is only valid if IsRand() returns true.
|
||||
Rand(BinlogFormat) (uint64, uint64, error)
|
||||
// PreviousGTIDs returns the Position from the event.
|
||||
// This is only valid if IsPreviousGTIDs() returns true.
|
||||
PreviousGTIDs(BinlogFormat) (Position, error)
|
||||
|
||||
// StripChecksum returns the checksum and a modified event with the checksum
|
||||
// stripped off, if any. If there is no checksum, it returns the same event
|
||||
|
|
|
@ -348,6 +348,51 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
|
|||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// NewMysql56GTIDSetFromSIDBlock builds a Mysql56GTIDSet from parsing a SID Block.
|
||||
// This is the reverse of the SIDBlock method.
|
||||
//
|
||||
// Expected format:
|
||||
// # bytes field
|
||||
// 8 nSIDs
|
||||
// (nSIDs times)
|
||||
// 16 SID
|
||||
// 8 nIntervals
|
||||
// (nIntervals times)
|
||||
// 8 start
|
||||
// 8 end
|
||||
func NewMysql56GTIDSetFromSIDBlock(data []byte) (Mysql56GTIDSet, error) {
|
||||
buf := bytes.NewReader(data)
|
||||
var set Mysql56GTIDSet = make(map[SID][]interval)
|
||||
var nSIDs uint64
|
||||
if err := binary.Read(buf, binary.LittleEndian, &nSIDs); err != nil {
|
||||
return nil, fmt.Errorf("cannot read nSIDs: %v", err)
|
||||
}
|
||||
for i := uint64(0); i < nSIDs; i++ {
|
||||
var sid SID
|
||||
if c, err := buf.Read(sid[:]); err != nil || c != 16 {
|
||||
return nil, fmt.Errorf("cannot read SID %v: %v %v", i, err, c)
|
||||
}
|
||||
var nIntervals uint64
|
||||
if err := binary.Read(buf, binary.LittleEndian, &nIntervals); err != nil {
|
||||
return nil, fmt.Errorf("cannot read nIntervals %v: %v", i, err)
|
||||
}
|
||||
for j := uint64(0); j < nIntervals; j++ {
|
||||
var start, end uint64
|
||||
if err := binary.Read(buf, binary.LittleEndian, &start); err != nil {
|
||||
return nil, fmt.Errorf("cannot read start %v/%v: %v", i, j, err)
|
||||
}
|
||||
if err := binary.Read(buf, binary.LittleEndian, &end); err != nil {
|
||||
return nil, fmt.Errorf("cannot read end %v/%v: %v", i, j, err)
|
||||
}
|
||||
set[sid] = append(set[sid], interval{
|
||||
start: int64(start),
|
||||
end: int64(end - 1),
|
||||
})
|
||||
}
|
||||
}
|
||||
return set, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
gtidSetParsers[mysql56FlavorID] = parseMysql56GTIDSet
|
||||
}
|
||||
|
|
|
@ -437,7 +437,17 @@ func TestMysql56GTIDSetSIDBlock(t *testing.T) {
|
|||
// sid2: interval 1 end
|
||||
6, 0, 0, 0, 0, 0, 0, 0,
|
||||
}
|
||||
if got := input.SIDBlock(); !reflect.DeepEqual(got, want) {
|
||||
got := input.SIDBlock()
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Errorf("%#v.SIDBlock() = %#v, want %#v", input, got, want)
|
||||
}
|
||||
|
||||
// Testing the conversion back.
|
||||
set, err := NewMysql56GTIDSetFromSIDBlock(want)
|
||||
if err != nil {
|
||||
t.Fatalf("Reconstructing Mysql56GTIDSet from SID block failed: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(set, input) {
|
||||
t.Errorf("NewMysql56GTIDSetFromSIDBlock(%#v) = %#v, want %#v", want, set, input)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -174,21 +174,35 @@ func (sc *SlaveConnection) StartBinlogDumpFromPosition(ctx context.Context, star
|
|||
return eventChan, nil
|
||||
}
|
||||
|
||||
// StartBinlogDumpFromTimestamp requests a replication binlog dump from
|
||||
// the master mysqld at the given timestamp and then sends binlog
|
||||
// events to the provided channel.
|
||||
// StartBinlogDumpFromBinlogBeforeTimestamp requests a replication
|
||||
// binlog dump from the master mysqld starting with a file that has
|
||||
// timestamps smaller than the provided timestamp, and then sends
|
||||
// binlog events to the provided channel.
|
||||
//
|
||||
// The startup phase will list all the binary logs, and find the one
|
||||
// that has events starting strictly before the provided timestamp. It
|
||||
// will then start from there, and skip all events that are before the
|
||||
// provided timestamp.
|
||||
// will then start from there, and stream all events. It is the
|
||||
// responsability of the calling site to filter the events more.
|
||||
//
|
||||
// MySQL 5.6+ note: we need to do it that way because of the way the
|
||||
// GTIDSet works. In the previous two streaming functions, we pass in
|
||||
// the full GTIDSet (that has the list of all transactions seen in
|
||||
// the replication stream). In this case, we don't know it, all we
|
||||
// have is the binlog file names. We depend on parsing the first
|
||||
// PREVIOUS_GTIDS_EVENT event in the logs to get it. So we need the
|
||||
// caller to parse that event, and it can't be skipped because its
|
||||
// timestamp is lower. Then, for each subsequent event, the caller
|
||||
// also needs to add the event GTID to its GTIDSet. Otherwise it won't
|
||||
// be correct ever. So the caller really needs to build up its GTIDSet
|
||||
// along the entire file, not just for events whose timestamp is in a
|
||||
// given range.
|
||||
//
|
||||
// The stream will continue in the background, waiting for new events if
|
||||
// necessary, until the connection is closed, either by the master or
|
||||
// by canceling the context.
|
||||
//
|
||||
// Note the context is valid and used until eventChan is closed.
|
||||
func (sc *SlaveConnection) StartBinlogDumpFromTimestamp(ctx context.Context, timestamp int64) (<-chan replication.BinlogEvent, error) {
|
||||
func (sc *SlaveConnection) StartBinlogDumpFromBinlogBeforeTimestamp(ctx context.Context, timestamp int64) (<-chan replication.BinlogEvent, error) {
|
||||
ctx, sc.cancel = context.WithCancel(ctx)
|
||||
|
||||
flavor, err := sc.mysqld.flavor()
|
||||
|
@ -204,7 +218,7 @@ func (sc *SlaveConnection) StartBinlogDumpFromTimestamp(ctx context.Context, tim
|
|||
|
||||
// Start with the most recent binlog file until we find the right event.
|
||||
var binlogIndex int
|
||||
var firstEvent replication.BinlogEvent
|
||||
var event replication.BinlogEvent
|
||||
for binlogIndex = len(binlogs.Rows) - 1; binlogIndex >= 0; binlogIndex-- {
|
||||
// Exit the loop early if context is canceled.
|
||||
select {
|
||||
|
@ -222,8 +236,9 @@ func (sc *SlaveConnection) StartBinlogDumpFromTimestamp(ctx context.Context, tim
|
|||
return nil, fmt.Errorf("failed to send the ComBinlogDump command: %v", err)
|
||||
}
|
||||
|
||||
// Get the first event to get its timestamp. We skip ROTATE
|
||||
// events, as they don't have timestamps.
|
||||
// Get the first event to get its timestamp. We skip
|
||||
// events that don't have timestamps (although it seems
|
||||
// most do anyway).
|
||||
for {
|
||||
buf, err := sc.Conn.ReadPacket()
|
||||
if err != nil {
|
||||
|
@ -236,15 +251,17 @@ func (sc *SlaveConnection) StartBinlogDumpFromTimestamp(ctx context.Context, tim
|
|||
}
|
||||
|
||||
// Parse the full event.
|
||||
firstEvent = flavor.MakeBinlogEvent(buf[1:])
|
||||
if !firstEvent.IsValid() {
|
||||
event = flavor.MakeBinlogEvent(buf[1:])
|
||||
if !event.IsValid() {
|
||||
return nil, fmt.Errorf("first event from binlog %v is not valid", binlog)
|
||||
}
|
||||
if !firstEvent.IsRotate() {
|
||||
if event.Timestamp() > 0 {
|
||||
// We found the first event with a
|
||||
// valid timestamp.
|
||||
break
|
||||
}
|
||||
}
|
||||
if int64(firstEvent.Timestamp()) < timestamp {
|
||||
if int64(event.Timestamp()) < timestamp {
|
||||
// The first event in this binlog has a smaller
|
||||
// timestamp than what we need, we found a good
|
||||
// starting point.
|
||||
|
@ -266,35 +283,6 @@ func (sc *SlaveConnection) StartBinlogDumpFromTimestamp(ctx context.Context, tim
|
|||
return nil, ErrBinlogUnavailable
|
||||
}
|
||||
|
||||
// Now skip all events that have a smaller timestamp
|
||||
var event replication.BinlogEvent
|
||||
for {
|
||||
buf, err := sc.Conn.ReadPacket()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading packet while skipping binlog events: %v", err)
|
||||
}
|
||||
if buf[0] == 254 {
|
||||
// The master is telling us to stop.
|
||||
return nil, fmt.Errorf("received EOF packet in binlog dump while skipping packets: %#v", buf)
|
||||
}
|
||||
|
||||
event = flavor.MakeBinlogEvent(buf[1:])
|
||||
if !event.IsValid() {
|
||||
return nil, fmt.Errorf("event from binlog is not valid (while skipping)")
|
||||
}
|
||||
if int64(event.Timestamp()) >= timestamp {
|
||||
// we found the first event to send
|
||||
break
|
||||
}
|
||||
|
||||
// Exit the loop early if context is canceled.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Now just loop sending and reading events.
|
||||
// FIXME(alainjobart) I think we can use a buffered channel for better performance.
|
||||
eventChan := make(chan replication.BinlogEvent)
|
||||
|
@ -307,14 +295,6 @@ func (sc *SlaveConnection) StartBinlogDumpFromTimestamp(ctx context.Context, tim
|
|||
sc.wg.Done()
|
||||
}()
|
||||
|
||||
// Send the first binlog event, it has the format description.
|
||||
select {
|
||||
case eventChan <- firstEvent:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
// Then send the rest.
|
||||
for {
|
||||
select {
|
||||
case eventChan <- event:
|
||||
|
|
|
@ -264,7 +264,7 @@ class InvalidatorThread(threading.Thread):
|
|||
self.timestamp)
|
||||
|
||||
def invalidate(self, table_name, row_id, token):
|
||||
logging.debug('Invalidating %s(%d):', table_name, row_id)
|
||||
logging.debug('Invalidating %s(%d) - %s:', table_name, row_id, token)
|
||||
version, cache_event_token, _ = self.cache.gets(table_name, row_id)
|
||||
if version is None:
|
||||
logging.debug(' no entry in cache, saving event_token')
|
||||
|
|
Загрузка…
Ссылка в новой задаче