Merge pull request #6296 from vitessio/find-errant-gtids

Find Errant GTIDs
This commit is contained in:
Deepthi Sigireddi 2020-06-17 11:56:30 -07:00 коммит произвёл GitHub
Родитель 8ff572f285 703fe9bf37
Коммит aa909c5128
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 306 добавлений и 22 удалений

Просмотреть файл

@ -122,6 +122,15 @@ func (mysqlFlavor) status(c *Conn) (SlaveStatus, error) {
func parseMysqlSlaveStatus(resultMap map[string]string) (SlaveStatus, error) {
status := parseSlaveStatus(resultMap)
uuidString := resultMap["Master_UUID"]
if uuidString != "" {
sid, err := ParseSID(uuidString)
if err != nil {
return SlaveStatus{}, vterrors.Wrapf(err, "cannot decode MasterUUID")
}
status.MasterUUID = sid
}
var err error
status.Position.GTIDSet, err = parseMysql56GTIDSet(resultMap["Executed_Gtid_Set"])
if err != nil {

Просмотреть файл

@ -435,6 +435,128 @@ func (set Mysql56GTIDSet) SIDBlock() []byte {
return buf.Bytes()
}
func (set Mysql56GTIDSet) Difference(other Mysql56GTIDSet) Mysql56GTIDSet {
if other == nil || set == nil {
return set
}
// Make a fresh, empty set to hold the new value.
// This function is not supposed to modify the original set.
differenceSet := make(Mysql56GTIDSet)
for sid, intervals := range set {
otherIntervals, ok := other[sid]
if !ok {
// We didn't find SID in other set, so diff should include all intervals for sid unique to receiver.
differenceSet[sid] = intervals
continue
}
// Found server id match between sets, so now we need to subtract each interval.
var diffIntervals []interval
advance := func() bool {
if len(intervals) == 0 {
return false
}
diffIntervals = append(diffIntervals, intervals[0])
intervals = intervals[1:]
return true
}
var otherInterval interval
advanceOther := func() bool {
if len(otherIntervals) == 0 {
return false
}
otherInterval = otherIntervals[0]
otherIntervals = otherIntervals[1:]
return true
}
if !advance() {
continue
}
if !advanceOther() {
differenceSet[sid] = intervals
continue
}
diffLoop:
for {
iv := diffIntervals[len(diffIntervals)-1]
switch {
case iv.end < otherInterval.start:
// [1, 2] - [3, 5]
// Need to skip to next s1 interval. This one is completely before otherInterval even starts. It's a diff in whole.
if !advance() {
break diffLoop
}
case iv.start > otherInterval.end:
// [3, 5] - [1, 2]
// Interval is completely past other interval. We need a valid other to compare against.
if !advanceOther() {
break diffLoop
}
case iv.start >= otherInterval.start && iv.end <= otherInterval.end:
// [3, 4] - [1, 5]
// Interval is completed contained. Pop off diffIntervals, and advance to next s1.
diffIntervals = diffIntervals[:len(diffIntervals)-1]
if !advance() {
break diffLoop
}
case iv.start < otherInterval.start && iv.end >= otherInterval.start && iv.end <= otherInterval.end:
// [1, 4] - [3, 5]
// We have a unique interval prior to where otherInterval starts and should adjust end to match this piece.
diffIntervals[len(diffIntervals)-1].end = otherInterval.start - 1
if !advance() {
break diffLoop
}
case iv.start >= otherInterval.start && iv.start <= otherInterval.end && iv.end > otherInterval.end:
// [3, 7] - [1, 5]
// We have an end piece to deal with.
diffIntervals[len(diffIntervals)-1].start = otherInterval.end + 1
// We need to pop s2 at this point. s1's new interval is fully past otherInterval, so no point in comparing
// this one next round.
if !advanceOther() {
break diffLoop
}
case iv.start < otherInterval.start && iv.end > otherInterval.end:
// [1, 7] - [3, 4]
// End is strictly greater. In this case we need to create an extra diff interval. We'll deal with any necessary trimming of it next round.
diffIntervals[len(diffIntervals)-1].end = otherInterval.start - 1
diffIntervals = append(diffIntervals, interval{start: otherInterval.end + 1, end: iv.end})
// We need to pop s2 at this point. s1's new interval is fully past otherInterval, so no point in comparing
// this one next round.
if !advanceOther() {
break diffLoop
}
default:
panic("This should never happen.")
}
}
if len(intervals) != 0 {
// If we've gotten to this point, then we have intervals that exist beyond the bounds of any intervals in otherIntervals, and they
// are all diffs and should be added in whole.
diffIntervals = append(diffIntervals, intervals...)
}
differenceSet[sid] = diffIntervals
}
return differenceSet
}
// NewMysql56GTIDSetFromSIDBlock builds a Mysql56GTIDSet from parsing a SID Block.
// This is the reverse of the SIDBlock method.
//

Просмотреть файл

@ -446,6 +446,41 @@ func TestMysql56GTIDSetUnion(t *testing.T) {
}
}
func TestMysql56GTIDSetDifference(t *testing.T) {
sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
sid2 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}
sid3 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 17}
sid4 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18}
sid5 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 19}
set1 := Mysql56GTIDSet{
sid1: []interval{{20, 30}, {35, 39}, {40, 53}, {55, 75}},
sid2: []interval{{1, 7}, {20, 50}, {60, 70}},
sid4: []interval{{1, 30}},
sid5: []interval{{1, 7}, {20, 30}},
}
set2 := Mysql56GTIDSet{
sid1: []interval{{20, 30}, {35, 37}, {50, 60}},
sid2: []interval{{3, 5}, {22, 25}, {32, 37}, {67, 70}},
sid3: []interval{{1, 45}},
sid5: []interval{{2, 6}, {15, 40}},
}
got := set1.Difference(set2)
want := Mysql56GTIDSet{
sid1: []interval{{38, 39}, {40, 49}, {61, 75}},
sid2: []interval{{1, 2}, {6, 7}, {20, 21}, {26, 31}, {38, 50}, {60, 66}},
sid4: []interval{{1, 30}},
sid5: []interval{{1, 1}, {7, 7}},
}
if !got.Equal(want) {
t.Errorf("got %#v; want %#v", got, want)
}
}
func TestMysql56GTIDSetSIDBlock(t *testing.T) {
sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
sid2 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}

Просмотреть файл

@ -17,6 +17,8 @@ limitations under the License.
package mysql
import (
"fmt"
replicationdatapb "vitess.io/vitess/go/vt/proto/replicationdata"
"vitess.io/vitess/go/vt/vterrors"
)
@ -38,6 +40,7 @@ type SlaveStatus struct {
MasterHost string
MasterPort int
MasterConnectRetry int
MasterUUID SID
}
// SlaveRunning returns true iff both the Slave IO and Slave SQL threads are
@ -60,6 +63,7 @@ func SlaveStatusToProto(s SlaveStatus) *replicationdatapb.Status {
MasterHost: s.MasterHost,
MasterPort: int32(s.MasterPort),
MasterConnectRetry: int32(s.MasterConnectRetry),
MasterUuid: s.MasterUUID.String(),
}
}
@ -81,6 +85,13 @@ func ProtoToSlaveStatus(s *replicationdatapb.Status) SlaveStatus {
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode FileRelayLogPosition"))
}
var sid SID
if s.MasterUuid != "" {
sid, err = ParseSID(s.MasterUuid)
if err != nil {
panic(vterrors.Wrapf(err, "cannot decode MasterUUID"))
}
}
return SlaveStatus{
Position: pos,
RelayLogPosition: relayPos,
@ -93,5 +104,54 @@ func ProtoToSlaveStatus(s *replicationdatapb.Status) SlaveStatus {
MasterHost: s.MasterHost,
MasterPort: int(s.MasterPort),
MasterConnectRetry: int(s.MasterConnectRetry),
MasterUUID: sid,
}
}
// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
// provided as a list of SlaveStatus's. This method only works if the flavor for all retrieved SlaveStatus's is MySQL.
// The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID.
func (s *SlaveStatus) FindErrantGTIDs(otherReplicaStatuses []*SlaveStatus) (Mysql56GTIDSet, error) {
set, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
if !ok {
return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor")
}
otherSets := make([]Mysql56GTIDSet, 0, len(otherReplicaStatuses))
for _, status := range otherReplicaStatuses {
otherSet, ok := status.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
if !ok {
panic("The receiver SlaveStatus contained a Mysql56GTIDSet in its relay log, but a replica's SlaveStatus is of another flavor. This should never happen.")
}
// Copy and throw out master SID from consideration, so we don't mutate input.
otherSetNoMasterSID := make(Mysql56GTIDSet, len(otherSet))
for sid, intervals := range otherSet {
if sid == status.MasterUUID {
continue
}
otherSetNoMasterSID[sid] = intervals
}
otherSets = append(otherSets, otherSetNoMasterSID)
}
// Copy set for final diffSet so we don't mutate receiver.
diffSet := make(Mysql56GTIDSet, len(set))
for sid, intervals := range set {
if sid == s.MasterUUID {
continue
}
diffSet[sid] = intervals
}
for _, otherSet := range otherSets {
diffSet = diffSet.Difference(otherSet)
}
if len(diffSet) == 0 {
// If diffSet is empty, then we have no errant GTIDs.
return nil, nil
}
return diffSet, nil
}

Просмотреть файл

@ -52,3 +52,51 @@ func TestStatusSlaveSQLNotRunning(t *testing.T) {
t.Errorf("%#v.SlaveRunning() = %v, want %v", input, got, want)
}
}
func TestFindErrantGTIDs(t *testing.T) {
sid1 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
sid2 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 16}
sid3 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 17}
sid4 := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18}
masterSID := SID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 19}
set1 := Mysql56GTIDSet{
sid1: []interval{{20, 30}, {35, 39}, {40, 53}, {55, 75}},
sid2: []interval{{1, 7}, {20, 50}, {60, 70}},
sid4: []interval{{1, 30}},
masterSID: []interval{{1, 7}, {20, 30}},
}
set2 := Mysql56GTIDSet{
sid1: []interval{{20, 30}, {35, 37}, {50, 60}},
sid2: []interval{{3, 5}, {22, 25}, {32, 37}, {67, 70}},
sid3: []interval{{1, 45}},
masterSID: []interval{{2, 6}, {15, 40}},
}
set3 := Mysql56GTIDSet{
sid1: []interval{{20, 30}, {35, 38}, {50, 70}},
sid2: []interval{{3, 5}, {22, 25}, {32, 37}, {67, 70}},
sid3: []interval{{1, 45}},
masterSID: []interval{{2, 6}, {15, 45}},
}
slaveStatus1 := SlaveStatus{MasterUUID: masterSID, RelayLogPosition: Position{GTIDSet: set1}}
slaveStatus2 := SlaveStatus{MasterUUID: masterSID, RelayLogPosition: Position{GTIDSet: set2}}
slaveStatus3 := SlaveStatus{MasterUUID: masterSID, RelayLogPosition: Position{GTIDSet: set3}}
got, err := slaveStatus1.FindErrantGTIDs([]*SlaveStatus{&slaveStatus2, &slaveStatus3})
if err != nil {
t.Errorf("%v", err)
}
want := Mysql56GTIDSet{
sid1: []interval{{39, 39}, {40, 49}, {71, 75}},
sid2: []interval{{1, 2}, {6, 7}, {20, 21}, {26, 31}, {38, 50}, {60, 66}},
sid4: []interval{{1, 30}},
}
if !got.Equal(want) {
t.Errorf("got %#v; want %#v", got, want)
}
}

Просмотреть файл

@ -36,6 +36,7 @@ type Status struct {
MasterHost string `protobuf:"bytes,5,opt,name=master_host,json=masterHost,proto3" json:"master_host,omitempty"`
MasterPort int32 `protobuf:"varint,6,opt,name=master_port,json=masterPort,proto3" json:"master_port,omitempty"`
MasterConnectRetry int32 `protobuf:"varint,7,opt,name=master_connect_retry,json=masterConnectRetry,proto3" json:"master_connect_retry,omitempty"`
MasterUuid string `protobuf:"bytes,12,opt,name=master_uuid,json=masterUuid,proto3" json:"master_uuid,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -143,6 +144,13 @@ func (m *Status) GetMasterConnectRetry() int32 {
return 0
}
func (m *Status) GetMasterUuid() string {
if m != nil {
return m.MasterUuid
}
return ""
}
func init() {
proto.RegisterType((*Status)(nil), "replicationdata.Status")
}
@ -150,26 +158,27 @@ func init() {
func init() { proto.RegisterFile("replicationdata.proto", fileDescriptor_ee8ee22b8c4b9d06) }
var fileDescriptor_ee8ee22b8c4b9d06 = []byte{
// 335 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xdd, 0x4a, 0xc3, 0x30,
0x14, 0x80, 0xa9, 0x73, 0x73, 0xcb, 0xdc, 0x8f, 0x71, 0xc3, 0xe2, 0x8d, 0x45, 0x6f, 0x8a, 0x8c,
0x55, 0x14, 0x5f, 0x60, 0xde, 0x38, 0x50, 0x18, 0xdd, 0x9d, 0x37, 0xa1, 0x6b, 0x63, 0x17, 0x88,
0x39, 0x5d, 0x92, 0x15, 0xf6, 0xa2, 0x3e, 0x8f, 0xf4, 0x64, 0xab, 0x32, 0xbc, 0x6b, 0xbf, 0xef,
0xe3, 0xf4, 0x14, 0x0e, 0x19, 0x6b, 0x5e, 0x48, 0x91, 0x26, 0x56, 0x80, 0xca, 0x12, 0x9b, 0x4c,
0x0b, 0x0d, 0x16, 0xe8, 0xe0, 0x08, 0xdf, 0x7e, 0x37, 0x48, 0x6b, 0x69, 0x13, 0xbb, 0x35, 0xf4,
0x9a, 0xb4, 0x0b, 0x30, 0xa2, 0x52, 0xbe, 0x17, 0x78, 0x61, 0x27, 0xae, 0xdf, 0xe9, 0x84, 0x50,
0xcd, 0x65, 0xb2, 0x63, 0x12, 0x72, 0x56, 0x57, 0x6d, 0xac, 0x86, 0x68, 0xde, 0x20, 0x5f, 0x1c,
0xea, 0x3b, 0xd2, 0xfb, 0x14, 0x92, 0xff, 0x86, 0x1d, 0x0c, 0xcf, 0x2b, 0x58, 0x47, 0xcf, 0xe4,
0x0a, 0xa3, 0x7f, 0xe6, 0x12, 0xcc, 0x47, 0x95, 0x8e, 0x8f, 0x67, 0x87, 0x64, 0xf8, 0x95, 0x18,
0xcb, 0x35, 0x33, 0x5c, 0x97, 0x5c, 0x33, 0x91, 0xf9, 0xdd, 0xc0, 0x0b, 0x7b, 0x71, 0xdf, 0xf1,
0x25, 0xe2, 0x79, 0x56, 0x95, 0x46, 0x26, 0x25, 0x67, 0x02, 0x98, 0xde, 0x2a, 0x25, 0x54, 0xee,
0x9f, 0x04, 0x5e, 0xd8, 0x8e, 0xfb, 0xc8, 0xe7, 0x10, 0x3b, 0x4a, 0xef, 0xc9, 0x85, 0x2b, 0xcd,
0x46, 0xd6, 0x69, 0x03, 0xd3, 0x01, 0x8a, 0xe5, 0x46, 0x1e, 0xda, 0x47, 0x32, 0x36, 0x3c, 0x05,
0x95, 0x19, 0xb6, 0xe2, 0x6b, 0xa1, 0x32, 0xe6, 0x3e, 0xeb, 0x9f, 0xe2, 0x12, 0x97, 0x7b, 0x39,
0x43, 0xf7, 0x8e, 0x8a, 0xde, 0x90, 0xee, 0x7e, 0xe7, 0x35, 0x18, 0xeb, 0x37, 0xf1, 0xf7, 0x88,
0x43, 0xaf, 0x60, 0xec, 0x9f, 0xa0, 0x00, 0x6d, 0xfd, 0x56, 0xe0, 0x85, 0xcd, 0x43, 0xb0, 0x00,
0x6d, 0xe9, 0x03, 0x19, 0xed, 0x83, 0x14, 0x94, 0xe2, 0xa9, 0x65, 0x9a, 0x5b, 0xbd, 0xf3, 0xcf,
0xb0, 0xa4, 0xce, 0xbd, 0x38, 0x15, 0x57, 0x66, 0x36, 0xfd, 0x98, 0x94, 0xc2, 0x72, 0x63, 0xa6,
0x02, 0x22, 0xf7, 0x14, 0xe5, 0x10, 0x95, 0x36, 0xc2, 0x4b, 0x88, 0x8e, 0x0e, 0x61, 0xd5, 0x42,
0xfc, 0xf4, 0x13, 0x00, 0x00, 0xff, 0xff, 0x06, 0x44, 0xd5, 0xd4, 0x39, 0x02, 0x00, 0x00,
// 350 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xcf, 0x6b, 0xdb, 0x30,
0x14, 0x80, 0xf1, 0xb2, 0x64, 0x89, 0xf2, 0x73, 0x5a, 0xc2, 0xc4, 0x2e, 0x33, 0xdb, 0xc5, 0x94,
0x10, 0x97, 0x96, 0xfe, 0x03, 0xe9, 0xa5, 0x81, 0x16, 0x82, 0x43, 0x2f, 0xbd, 0x08, 0xc7, 0x56,
0x1d, 0x81, 0xab, 0xe7, 0x48, 0xb2, 0x21, 0x7f, 0x79, 0xaf, 0xc5, 0x4f, 0x89, 0x1b, 0x42, 0x6f,
0xf6, 0xf7, 0x7d, 0x3c, 0x49, 0xf0, 0xc8, 0x4c, 0x8b, 0x22, 0x97, 0x49, 0x6c, 0x25, 0xa8, 0x34,
0xb6, 0xf1, 0xa2, 0xd0, 0x60, 0x81, 0x8e, 0x2f, 0xf0, 0xbf, 0xf7, 0x16, 0xe9, 0x6c, 0x6c, 0x6c,
0x4b, 0x43, 0xff, 0x90, 0x6e, 0x01, 0x46, 0xd6, 0x8a, 0x79, 0xbe, 0x17, 0xf4, 0xa2, 0xe6, 0x9f,
0xce, 0x09, 0xd5, 0x22, 0x8f, 0x0f, 0x3c, 0x87, 0x8c, 0x37, 0x55, 0x17, 0xab, 0x09, 0x9a, 0x47,
0xc8, 0xd6, 0xa7, 0xfa, 0x3f, 0x19, 0xbe, 0xca, 0x5c, 0x7c, 0x86, 0x3d, 0x0c, 0x07, 0x35, 0x6c,
0xa2, 0x3b, 0xf2, 0x1b, 0xa3, 0x2f, 0xe6, 0x12, 0xcc, 0xa7, 0xb5, 0x8e, 0x2e, 0x67, 0x07, 0x64,
0xf2, 0x16, 0x1b, 0x2b, 0x34, 0x37, 0x42, 0x57, 0x42, 0x73, 0x99, 0xb2, 0xbe, 0xef, 0x05, 0xc3,
0x68, 0xe4, 0xf8, 0x06, 0xf1, 0x2a, 0xad, 0x4b, 0x93, 0xc7, 0x95, 0xe0, 0x12, 0xb8, 0x2e, 0x95,
0x92, 0x2a, 0x63, 0xdf, 0x7c, 0x2f, 0xe8, 0x46, 0x23, 0xe4, 0x2b, 0x88, 0x1c, 0xa5, 0x57, 0xe4,
0xa7, 0x2b, 0xcd, 0x3e, 0x6f, 0xd2, 0x16, 0xa6, 0x63, 0x14, 0x9b, 0x7d, 0x7e, 0x6a, 0x6f, 0xc8,
0xcc, 0x88, 0x04, 0x54, 0x6a, 0xf8, 0x56, 0xec, 0xa4, 0x4a, 0xb9, 0x3b, 0x96, 0x7d, 0xc7, 0x4b,
0xfc, 0x3a, 0xca, 0x25, 0xba, 0x27, 0x54, 0xf4, 0x2f, 0xe9, 0x1f, 0xef, 0xbc, 0x03, 0x63, 0x59,
0x1b, 0x9f, 0x47, 0x1c, 0x7a, 0x00, 0x63, 0xcf, 0x82, 0x02, 0xb4, 0x65, 0x1d, 0xdf, 0x0b, 0xda,
0xa7, 0x60, 0x0d, 0xda, 0xd2, 0x6b, 0x32, 0x3d, 0x06, 0x09, 0x28, 0x25, 0x12, 0xcb, 0xb5, 0xb0,
0xfa, 0xc0, 0x7e, 0x60, 0x49, 0x9d, 0xbb, 0x77, 0x2a, 0xaa, 0xcd, 0xd9, 0xc8, 0xb2, 0x94, 0x29,
0x1b, 0x9c, 0x9f, 0xf9, 0x5c, 0xca, 0x74, 0xb9, 0x78, 0x99, 0x57, 0xd2, 0x0a, 0x63, 0x16, 0x12,
0x42, 0xf7, 0x15, 0x66, 0x10, 0x56, 0x36, 0xc4, 0x55, 0x09, 0x2f, 0x36, 0x65, 0xdb, 0x41, 0x7c,
0xfb, 0x11, 0x00, 0x00, 0xff, 0xff, 0x42, 0xb5, 0xb6, 0xf7, 0x5a, 0x02, 0x00, 0x00,
}

Просмотреть файл

@ -36,4 +36,5 @@ message Status {
string master_host = 5;
int32 master_port = 6;
int32 master_connect_retry = 7;
string master_uuid = 12;
}