Merge pull request #923 from alainjobart/resharding

Now using strings in remote RPC communication for replication position.
This commit is contained in:
Alain Jobart 2015-07-24 12:21:16 -07:00
Родитель 92fafb49da cd69f52256
Коммит ff3aa3b242
10 изменённых файлов: 312 добавлений и 779 удалений

Просмотреть файл

@ -5,6 +5,8 @@
package proto
import (
"fmt"
mproto "github.com/youtube/vitess/go/mysql/proto"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -115,15 +117,19 @@ func ProtoToBinlogTransaction(bt *pb.BinlogTransaction) *BinlogTransaction {
func BlpPositionToProto(b *BlpPosition) *pbt.BlpPosition {
return &pbt.BlpPosition{
Uid: b.Uid,
Position: myproto.ReplicationPositionToProto(b.Position),
Position: myproto.EncodeReplicationPosition(b.Position),
}
}
// ProtoToBlpPosition converts a proto to a BlpPosition
func ProtoToBlpPosition(b *pbt.BlpPosition) *BlpPosition {
pos, err := myproto.DecodeReplicationPosition(b.Position)
if err != nil {
panic(fmt.Errorf("cannot decode position: %v", err))
}
return &BlpPosition{
Uid: b.Uid,
Position: myproto.ProtoToReplicationPosition(b.Position),
Position: pos,
}
}

Просмотреть файл

@ -11,85 +11,11 @@ import (
pbt "github.com/youtube/vitess/go/vt/proto/tabletmanagerdata"
)
// ReplicationPositionToProto translates a ReplicationPosition to
// proto, or panics
func ReplicationPositionToProto(rp ReplicationPosition) *pb.Position {
switch gtid := rp.GTIDSet.(type) {
case MariadbGTID:
return &pb.Position{
MariadbGtid: &pb.MariadbGtid{
Domain: gtid.Domain,
Server: gtid.Server,
Sequence: gtid.Sequence,
},
}
case Mysql56GTIDSet:
result := &pb.Position{
MysqlGtidSet: &pb.MysqlGtidSet{},
}
for k, v := range gtid {
s := &pb.MysqlGtidSet_MysqlUuidSet{
Interval: make([]*pb.MysqlGtidSet_MysqlInterval, len(v)),
}
s.Uuid = make([]byte, len(k))
for i, b := range k {
s.Uuid[i] = b
}
for i, in := range v {
s.Interval[i] = &pb.MysqlGtidSet_MysqlInterval{
First: uint64(in.start),
Last: uint64(in.end),
}
}
result.MysqlGtidSet.UuidSet = append(result.MysqlGtidSet.UuidSet, s)
}
return result
default:
panic(fmt.Errorf("can't convert ReplicationPosition to proto: %#v", rp))
}
}
// ProtoToReplicationPosition translates a proto ReplicationPosition, or panics
func ProtoToReplicationPosition(rp *pb.Position) ReplicationPosition {
if rp.MariadbGtid != nil {
return ReplicationPosition{
GTIDSet: MariadbGTID{
Domain: rp.MariadbGtid.Domain,
Server: rp.MariadbGtid.Server,
Sequence: rp.MariadbGtid.Sequence,
},
}
}
if rp.MysqlGtidSet != nil {
m := Mysql56GTIDSet(make(map[SID][]interval))
for _, s := range rp.MysqlGtidSet.UuidSet {
if len(s.Uuid) != 16 {
panic(fmt.Errorf("invalid MysqlGtidSet Uuid length: %v", len(s.Uuid)))
}
var sid SID
for i, b := range s.Uuid {
sid[i] = b
}
ins := make([]interval, len(s.Interval))
for i, in := range s.Interval {
ins[i].start = int64(in.First)
ins[i].end = int64(in.Last)
}
m[sid] = ins
}
return ReplicationPosition{
GTIDSet: m,
}
}
panic(fmt.Errorf("can't convert ReplicationPosition from proto: %#v", rp))
}
// ReplicationStatusToProto translates a ReplicationStatus to
// proto, or panics
func ReplicationStatusToProto(r ReplicationStatus) *pb.Status {
return &pb.Status{
Position: ReplicationPositionToProto(r.Position),
Position: EncodeReplicationPosition(r.Position),
SlaveIoRunning: r.SlaveIORunning,
SlaveSqlRunning: r.SlaveSQLRunning,
SecondsBehindMaster: uint32(r.SecondsBehindMaster),
@ -101,8 +27,12 @@ func ReplicationStatusToProto(r ReplicationStatus) *pb.Status {
// ProtoToReplicationStatus translates a proto ReplicationStatus, or panics
func ProtoToReplicationStatus(r *pb.Status) ReplicationStatus {
pos, err := DecodeReplicationPosition(r.Position)
if err != nil {
panic(fmt.Errorf("cannot decode Position: %v", err))
}
return ReplicationStatus{
Position: ProtoToReplicationPosition(r.Position),
Position: pos,
SlaveIORunning: r.SlaveIoRunning,
SlaveSQLRunning: r.SlaveSqlRunning,
SecondsBehindMaster: uint(r.SecondsBehindMaster),

Просмотреть файл

@ -9,9 +9,6 @@ It is generated from these files:
replicationdata.proto
It has these top-level messages:
MariadbGtid
MysqlGtidSet
Position
Status
*/
package replicationdata
@ -21,103 +18,18 @@ import proto "github.com/golang/protobuf/proto"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
// MariaDB 10.0
type MariadbGtid struct {
Domain uint32 `protobuf:"varint,1,opt,name=domain" json:"domain,omitempty"`
Server uint32 `protobuf:"varint,2,opt,name=server" json:"server,omitempty"`
Sequence uint64 `protobuf:"varint,3,opt,name=sequence" json:"sequence,omitempty"`
}
func (m *MariadbGtid) Reset() { *m = MariadbGtid{} }
func (m *MariadbGtid) String() string { return proto.CompactTextString(m) }
func (*MariadbGtid) ProtoMessage() {}
// MySQL 5.6
type MysqlGtidSet struct {
UuidSet []*MysqlGtidSet_MysqlUuidSet `protobuf:"bytes,1,rep,name=uuid_set" json:"uuid_set,omitempty"`
}
func (m *MysqlGtidSet) Reset() { *m = MysqlGtidSet{} }
func (m *MysqlGtidSet) String() string { return proto.CompactTextString(m) }
func (*MysqlGtidSet) ProtoMessage() {}
func (m *MysqlGtidSet) GetUuidSet() []*MysqlGtidSet_MysqlUuidSet {
if m != nil {
return m.UuidSet
}
return nil
}
type MysqlGtidSet_MysqlInterval struct {
First uint64 `protobuf:"varint,1,opt,name=first" json:"first,omitempty"`
Last uint64 `protobuf:"varint,2,opt,name=last" json:"last,omitempty"`
}
func (m *MysqlGtidSet_MysqlInterval) Reset() { *m = MysqlGtidSet_MysqlInterval{} }
func (m *MysqlGtidSet_MysqlInterval) String() string { return proto.CompactTextString(m) }
func (*MysqlGtidSet_MysqlInterval) ProtoMessage() {}
type MysqlGtidSet_MysqlUuidSet struct {
Uuid []byte `protobuf:"bytes,1,opt,name=uuid,proto3" json:"uuid,omitempty"`
Interval []*MysqlGtidSet_MysqlInterval `protobuf:"bytes,2,rep,name=interval" json:"interval,omitempty"`
}
func (m *MysqlGtidSet_MysqlUuidSet) Reset() { *m = MysqlGtidSet_MysqlUuidSet{} }
func (m *MysqlGtidSet_MysqlUuidSet) String() string { return proto.CompactTextString(m) }
func (*MysqlGtidSet_MysqlUuidSet) ProtoMessage() {}
func (m *MysqlGtidSet_MysqlUuidSet) GetInterval() []*MysqlGtidSet_MysqlInterval {
if m != nil {
return m.Interval
}
return nil
}
// Position represents the information required to specify where to start
// replication. The contents vary depending on the flavor of MySQL in use.
// We define all the fields here and use only the ones we need for each flavor.
type Position struct {
MariadbGtid *MariadbGtid `protobuf:"bytes,1,opt,name=mariadb_gtid" json:"mariadb_gtid,omitempty"`
MysqlGtidSet *MysqlGtidSet `protobuf:"bytes,2,opt,name=mysql_gtid_set" json:"mysql_gtid_set,omitempty"`
}
func (m *Position) Reset() { *m = Position{} }
func (m *Position) String() string { return proto.CompactTextString(m) }
func (*Position) ProtoMessage() {}
func (m *Position) GetMariadbGtid() *MariadbGtid {
if m != nil {
return m.MariadbGtid
}
return nil
}
func (m *Position) GetMysqlGtidSet() *MysqlGtidSet {
if m != nil {
return m.MysqlGtidSet
}
return nil
}
// Status is the replication status for MySQL (returned by 'show slave status'
// and parsed into a Position and fields).
type Status struct {
Position *Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
SlaveIoRunning bool `protobuf:"varint,2,opt,name=slave_io_running" json:"slave_io_running,omitempty"`
SlaveSqlRunning bool `protobuf:"varint,3,opt,name=slave_sql_running" json:"slave_sql_running,omitempty"`
SecondsBehindMaster uint32 `protobuf:"varint,4,opt,name=seconds_behind_master" json:"seconds_behind_master,omitempty"`
MasterHost string `protobuf:"bytes,5,opt,name=master_host" json:"master_host,omitempty"`
MasterPort int32 `protobuf:"varint,6,opt,name=master_port" json:"master_port,omitempty"`
MasterConnectRetry int32 `protobuf:"varint,7,opt,name=master_connect_retry" json:"master_connect_retry,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
SlaveIoRunning bool `protobuf:"varint,2,opt,name=slave_io_running" json:"slave_io_running,omitempty"`
SlaveSqlRunning bool `protobuf:"varint,3,opt,name=slave_sql_running" json:"slave_sql_running,omitempty"`
SecondsBehindMaster uint32 `protobuf:"varint,4,opt,name=seconds_behind_master" json:"seconds_behind_master,omitempty"`
MasterHost string `protobuf:"bytes,5,opt,name=master_host" json:"master_host,omitempty"`
MasterPort int32 `protobuf:"varint,6,opt,name=master_port" json:"master_port,omitempty"`
MasterConnectRetry int32 `protobuf:"varint,7,opt,name=master_connect_retry" json:"master_connect_retry,omitempty"`
}
func (m *Status) Reset() { *m = Status{} }
func (m *Status) String() string { return proto.CompactTextString(m) }
func (*Status) ProtoMessage() {}
func (m *Status) GetPosition() *Position {
if m != nil {
return m.Position
}
return nil
}

Просмотреть файл

@ -241,21 +241,14 @@ func (m *Permissions) GetHostPermissions() []*HostPermission {
// BlpPosition is a replication position for a given binlog player
type BlpPosition struct {
Uid uint32 `protobuf:"varint,1,opt,name=uid" json:"uid,omitempty"`
Position *replicationdata.Position `protobuf:"bytes,2,opt,name=position" json:"position,omitempty"`
Uid uint32 `protobuf:"varint,1,opt,name=uid" json:"uid,omitempty"`
Position string `protobuf:"bytes,2,opt,name=position" json:"position,omitempty"`
}
func (m *BlpPosition) Reset() { *m = BlpPosition{} }
func (m *BlpPosition) String() string { return proto.CompactTextString(m) }
func (*BlpPosition) ProtoMessage() {}
func (m *BlpPosition) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type PingRequest struct {
Payload string `protobuf:"bytes,1,opt,name=payload" json:"payload,omitempty"`
}
@ -625,20 +618,13 @@ func (m *MasterPositionRequest) String() string { return proto.CompactTextString
func (*MasterPositionRequest) ProtoMessage() {}
type MasterPositionResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *MasterPositionResponse) Reset() { *m = MasterPositionResponse{} }
func (m *MasterPositionResponse) String() string { return proto.CompactTextString(m) }
func (*MasterPositionResponse) ProtoMessage() {}
func (m *MasterPositionResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type StopSlaveRequest struct {
}
@ -654,36 +640,22 @@ func (m *StopSlaveResponse) String() string { return proto.CompactTextString(m)
func (*StopSlaveResponse) ProtoMessage() {}
type StopSlaveMinimumRequest struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
WaitTimeout int64 `protobuf:"varint,2,opt,name=wait_timeout" json:"wait_timeout,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
WaitTimeout int64 `protobuf:"varint,2,opt,name=wait_timeout" json:"wait_timeout,omitempty"`
}
func (m *StopSlaveMinimumRequest) Reset() { *m = StopSlaveMinimumRequest{} }
func (m *StopSlaveMinimumRequest) String() string { return proto.CompactTextString(m) }
func (*StopSlaveMinimumRequest) ProtoMessage() {}
func (m *StopSlaveMinimumRequest) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type StopSlaveMinimumResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *StopSlaveMinimumResponse) Reset() { *m = StopSlaveMinimumResponse{} }
func (m *StopSlaveMinimumResponse) String() string { return proto.CompactTextString(m) }
func (*StopSlaveMinimumResponse) ProtoMessage() {}
func (m *StopSlaveMinimumResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type StartSlaveRequest struct {
}
@ -821,20 +793,13 @@ func (m *RunBlpUntilRequest) GetBlpPositions() []*BlpPosition {
}
type RunBlpUntilResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *RunBlpUntilResponse) Reset() { *m = RunBlpUntilResponse{} }
func (m *RunBlpUntilResponse) String() string { return proto.CompactTextString(m) }
func (*RunBlpUntilResponse) ProtoMessage() {}
func (m *RunBlpUntilResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type ResetReplicationRequest struct {
}
@ -857,25 +822,18 @@ func (m *InitMasterRequest) String() string { return proto.CompactTextString(m)
func (*InitMasterRequest) ProtoMessage() {}
type InitMasterResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *InitMasterResponse) Reset() { *m = InitMasterResponse{} }
func (m *InitMasterResponse) String() string { return proto.CompactTextString(m) }
func (*InitMasterResponse) ProtoMessage() {}
func (m *InitMasterResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type PopulateReparentJournalRequest struct {
TimeCreatedNs int64 `protobuf:"varint,1,opt,name=time_created_ns" json:"time_created_ns,omitempty"`
ActionName string `protobuf:"bytes,2,opt,name=action_name" json:"action_name,omitempty"`
MasterAlias *topodata.TabletAlias `protobuf:"bytes,3,opt,name=master_alias" json:"master_alias,omitempty"`
ReplicationPosition *replicationdata.Position `protobuf:"bytes,4,opt,name=replication_position" json:"replication_position,omitempty"`
TimeCreatedNs int64 `protobuf:"varint,1,opt,name=time_created_ns" json:"time_created_ns,omitempty"`
ActionName string `protobuf:"bytes,2,opt,name=action_name" json:"action_name,omitempty"`
MasterAlias *topodata.TabletAlias `protobuf:"bytes,3,opt,name=master_alias" json:"master_alias,omitempty"`
ReplicationPosition string `protobuf:"bytes,4,opt,name=replication_position" json:"replication_position,omitempty"`
}
func (m *PopulateReparentJournalRequest) Reset() { *m = PopulateReparentJournalRequest{} }
@ -889,13 +847,6 @@ func (m *PopulateReparentJournalRequest) GetMasterAlias() *topodata.TabletAlias
return nil
}
func (m *PopulateReparentJournalRequest) GetReplicationPosition() *replicationdata.Position {
if m != nil {
return m.ReplicationPosition
}
return nil
}
type PopulateReparentJournalResponse struct {
}
@ -904,9 +855,9 @@ func (m *PopulateReparentJournalResponse) String() string { return proto.Compact
func (*PopulateReparentJournalResponse) ProtoMessage() {}
type InitSlaveRequest struct {
Parent *topodata.TabletAlias `protobuf:"bytes,1,opt,name=parent" json:"parent,omitempty"`
ReplicationPosition *replicationdata.Position `protobuf:"bytes,2,opt,name=replication_position" json:"replication_position,omitempty"`
TimeCreatedNs int64 `protobuf:"varint,3,opt,name=time_created_ns" json:"time_created_ns,omitempty"`
Parent *topodata.TabletAlias `protobuf:"bytes,1,opt,name=parent" json:"parent,omitempty"`
ReplicationPosition string `protobuf:"bytes,2,opt,name=replication_position" json:"replication_position,omitempty"`
TimeCreatedNs int64 `protobuf:"varint,3,opt,name=time_created_ns" json:"time_created_ns,omitempty"`
}
func (m *InitSlaveRequest) Reset() { *m = InitSlaveRequest{} }
@ -920,13 +871,6 @@ func (m *InitSlaveRequest) GetParent() *topodata.TabletAlias {
return nil
}
func (m *InitSlaveRequest) GetReplicationPosition() *replicationdata.Position {
if m != nil {
return m.ReplicationPosition
}
return nil
}
type InitSlaveResponse struct {
}
@ -942,50 +886,29 @@ func (m *DemoteMasterRequest) String() string { return proto.CompactTextString(m
func (*DemoteMasterRequest) ProtoMessage() {}
type DemoteMasterResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *DemoteMasterResponse) Reset() { *m = DemoteMasterResponse{} }
func (m *DemoteMasterResponse) String() string { return proto.CompactTextString(m) }
func (*DemoteMasterResponse) ProtoMessage() {}
func (m *DemoteMasterResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type PromoteSlaveWhenCaughtUpRequest struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *PromoteSlaveWhenCaughtUpRequest) Reset() { *m = PromoteSlaveWhenCaughtUpRequest{} }
func (m *PromoteSlaveWhenCaughtUpRequest) String() string { return proto.CompactTextString(m) }
func (*PromoteSlaveWhenCaughtUpRequest) ProtoMessage() {}
func (m *PromoteSlaveWhenCaughtUpRequest) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type PromoteSlaveWhenCaughtUpResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *PromoteSlaveWhenCaughtUpResponse) Reset() { *m = PromoteSlaveWhenCaughtUpResponse{} }
func (m *PromoteSlaveWhenCaughtUpResponse) String() string { return proto.CompactTextString(m) }
func (*PromoteSlaveWhenCaughtUpResponse) ProtoMessage() {}
func (m *PromoteSlaveWhenCaughtUpResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type SlaveWasPromotedRequest struct {
}
@ -1077,20 +1000,13 @@ func (m *PromoteSlaveRequest) String() string { return proto.CompactTextString(m
func (*PromoteSlaveRequest) ProtoMessage() {}
type PromoteSlaveResponse struct {
Position *replicationdata.Position `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
Position string `protobuf:"bytes,1,opt,name=position" json:"position,omitempty"`
}
func (m *PromoteSlaveResponse) Reset() { *m = PromoteSlaveResponse{} }
func (m *PromoteSlaveResponse) String() string { return proto.CompactTextString(m) }
func (*PromoteSlaveResponse) ProtoMessage() {}
func (m *PromoteSlaveResponse) GetPosition() *replicationdata.Position {
if m != nil {
return m.Position
}
return nil
}
type BackupRequest struct {
Concurrency int64 `protobuf:"varint,1,opt,name=concurrency" json:"concurrency,omitempty"`
}

Просмотреть файл

@ -350,7 +350,11 @@ func (client *Client) MasterPosition(ctx context.Context, tablet *topo.TabletInf
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), err
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
// StopSlave is part of the tmclient.TabletManagerClient interface
@ -372,13 +376,17 @@ func (client *Client) StopSlaveMinimum(ctx context.Context, tablet *topo.TabletI
}
defer cc.Close()
response, err := c.StopSlaveMinimum(ctx, &pb.StopSlaveMinimumRequest{
Position: myproto.ReplicationPositionToProto(minPos),
Position: myproto.EncodeReplicationPosition(minPos),
WaitTimeout: int64(waitTime),
})
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), err
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
// StartSlave is part of the tmclient.TabletManagerClient interface
@ -472,7 +480,11 @@ func (client *Client) RunBlpUntil(ctx context.Context, tablet *topo.TabletInfo,
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), nil
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
//
@ -501,7 +513,11 @@ func (client *Client) InitMaster(ctx context.Context, tablet *topo.TabletInfo) (
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), err
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
// PopulateReparentJournal is part of the tmclient.TabletManagerClient interface
@ -515,7 +531,7 @@ func (client *Client) PopulateReparentJournal(ctx context.Context, tablet *topo.
TimeCreatedNs: timeCreatedNS,
ActionName: actionName,
MasterAlias: topo.TabletAliasToProto(masterAlias),
ReplicationPosition: myproto.ReplicationPositionToProto(pos),
ReplicationPosition: myproto.EncodeReplicationPosition(pos),
})
return err
}
@ -529,7 +545,7 @@ func (client *Client) InitSlave(ctx context.Context, tablet *topo.TabletInfo, pa
defer cc.Close()
_, err = c.InitSlave(ctx, &pb.InitSlaveRequest{
Parent: topo.TabletAliasToProto(parent),
ReplicationPosition: myproto.ReplicationPositionToProto(replicationPosition),
ReplicationPosition: myproto.EncodeReplicationPosition(replicationPosition),
TimeCreatedNs: timeCreatedNS,
})
return err
@ -546,7 +562,11 @@ func (client *Client) DemoteMaster(ctx context.Context, tablet *topo.TabletInfo)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), err
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
// PromoteSlaveWhenCaughtUp is part of the tmclient.TabletManagerClient interface
@ -557,12 +577,16 @@ func (client *Client) PromoteSlaveWhenCaughtUp(ctx context.Context, tablet *topo
}
defer cc.Close()
response, err := c.PromoteSlaveWhenCaughtUp(ctx, &pb.PromoteSlaveWhenCaughtUpRequest{
Position: myproto.ReplicationPositionToProto(pos),
Position: myproto.EncodeReplicationPosition(pos),
})
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), err
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
// SlaveWasPromoted is part of the tmclient.TabletManagerClient interface
@ -629,7 +653,11 @@ func (client *Client) PromoteSlave(ctx context.Context, tablet *topo.TabletInfo)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return myproto.ProtoToReplicationPosition(response.Position), err
position, err := myproto.DecodeReplicationPosition(response.Position)
if err != nil {
return myproto.ReplicationPosition{}, err
}
return position, err
}
//

Просмотреть файл

@ -232,7 +232,7 @@ func (s *server) MasterPosition(ctx context.Context, request *pb.MasterPositionR
return response, s.agent.RPCWrap(ctx, actionnode.TabletActionMasterPosition, request, response, func() error {
position, err := s.agent.MasterPosition(ctx)
if err == nil {
response.Position = myproto.ReplicationPositionToProto(position)
response.Position = myproto.EncodeReplicationPosition(position)
}
return err
})
@ -250,9 +250,13 @@ func (s *server) StopSlaveMinimum(ctx context.Context, request *pb.StopSlaveMini
ctx = callinfo.GRPCCallInfo(ctx)
response := &pb.StopSlaveMinimumResponse{}
return response, s.agent.RPCWrapLock(ctx, actionnode.TabletActionStopSlaveMinimum, request, response, true, func() error {
position, err := s.agent.StopSlaveMinimum(ctx, myproto.ProtoToReplicationPosition(request.Position), time.Duration(request.WaitTimeout))
position, err := myproto.DecodeReplicationPosition(request.Position)
if err != nil {
return err
}
position, err = s.agent.StopSlaveMinimum(ctx, position, time.Duration(request.WaitTimeout))
if err == nil {
response.Position = myproto.ReplicationPositionToProto(position)
response.Position = myproto.EncodeReplicationPosition(position)
}
return err
})
@ -324,7 +328,7 @@ func (s *server) RunBlpUntil(ctx context.Context, request *pb.RunBlpUntilRequest
return response, s.agent.RPCWrapLock(ctx, actionnode.TabletActionRunBLPUntil, request, response, true, func() error {
position, err := s.agent.RunBlpUntil(ctx, blproto.ProtoToBlpPositionList(request.BlpPositions), time.Duration(request.WaitTimeout))
if err == nil {
response.Position = myproto.ReplicationPositionToProto(*position)
response.Position = myproto.EncodeReplicationPosition(*position)
}
return err
})
@ -348,7 +352,7 @@ func (s *server) InitMaster(ctx context.Context, request *pb.InitMasterRequest)
return response, s.agent.RPCWrapLockAction(ctx, actionnode.TabletActionInitMaster, request, response, true, func() error {
position, err := s.agent.InitMaster(ctx)
if err == nil {
response.Position = myproto.ReplicationPositionToProto(position)
response.Position = myproto.EncodeReplicationPosition(position)
}
return err
})
@ -358,7 +362,11 @@ func (s *server) PopulateReparentJournal(ctx context.Context, request *pb.Popula
ctx = callinfo.GRPCCallInfo(ctx)
response := &pb.PopulateReparentJournalResponse{}
return response, s.agent.RPCWrap(ctx, actionnode.TabletActionPopulateReparentJournal, request, response, func() error {
return s.agent.PopulateReparentJournal(ctx, request.TimeCreatedNs, request.ActionName, topo.ProtoToTabletAlias(request.MasterAlias), myproto.ProtoToReplicationPosition(request.ReplicationPosition))
position, err := myproto.DecodeReplicationPosition(request.ReplicationPosition)
if err != nil {
return err
}
return s.agent.PopulateReparentJournal(ctx, request.TimeCreatedNs, request.ActionName, topo.ProtoToTabletAlias(request.MasterAlias), position)
})
}
@ -366,7 +374,11 @@ func (s *server) InitSlave(ctx context.Context, request *pb.InitSlaveRequest) (*
ctx = callinfo.GRPCCallInfo(ctx)
response := &pb.InitSlaveResponse{}
return response, s.agent.RPCWrapLockAction(ctx, actionnode.TabletActionInitSlave, request, response, true, func() error {
return s.agent.InitSlave(ctx, topo.ProtoToTabletAlias(request.Parent), myproto.ProtoToReplicationPosition(request.ReplicationPosition), request.TimeCreatedNs)
position, err := myproto.DecodeReplicationPosition(request.ReplicationPosition)
if err != nil {
return err
}
return s.agent.InitSlave(ctx, topo.ProtoToTabletAlias(request.Parent), position, request.TimeCreatedNs)
})
}
@ -376,7 +388,7 @@ func (s *server) DemoteMaster(ctx context.Context, request *pb.DemoteMasterReque
return response, s.agent.RPCWrapLockAction(ctx, actionnode.TabletActionDemoteMaster, request, response, true, func() error {
position, err := s.agent.DemoteMaster(ctx)
if err == nil {
response.Position = myproto.ReplicationPositionToProto(position)
response.Position = myproto.EncodeReplicationPosition(position)
}
return err
})
@ -386,9 +398,13 @@ func (s *server) PromoteSlaveWhenCaughtUp(ctx context.Context, request *pb.Promo
ctx = callinfo.GRPCCallInfo(ctx)
response := &pb.PromoteSlaveWhenCaughtUpResponse{}
return response, s.agent.RPCWrapLockAction(ctx, actionnode.TabletActionPromoteSlaveWhenCaughtUp, request, response, true, func() error {
position, err := s.agent.PromoteSlaveWhenCaughtUp(ctx, myproto.ProtoToReplicationPosition(request.Position))
position, err := myproto.DecodeReplicationPosition(request.Position)
if err != nil {
return err
}
position, err = s.agent.PromoteSlaveWhenCaughtUp(ctx, position)
if err == nil {
response.Position = myproto.ReplicationPositionToProto(position)
response.Position = myproto.EncodeReplicationPosition(position)
}
return err
})
@ -438,7 +454,7 @@ func (s *server) PromoteSlave(ctx context.Context, request *pb.PromoteSlaveReque
return response, s.agent.RPCWrapLockAction(ctx, actionnode.TabletActionPromoteSlave, request, response, true, func() error {
position, err := s.agent.PromoteSlave(ctx)
if err == nil {
response.Position = myproto.ReplicationPositionToProto(position)
response.Position = myproto.EncodeReplicationPosition(position)
}
return err
})

Просмотреть файл

@ -4,38 +4,10 @@ syntax = "proto3";
package replicationdata;
// MariaDB 10.0
message MariadbGtid {
uint32 domain = 1;
uint32 server = 2;
uint64 sequence = 3;
}
// MySQL 5.6
message MysqlGtidSet {
message MysqlInterval {
uint64 first = 1;
uint64 last = 2;
}
message MysqlUuidSet {
bytes uuid = 1;
repeated MysqlInterval interval = 2;
}
repeated MysqlUuidSet uuid_set = 1;
}
// Position represents the information required to specify where to start
// replication. The contents vary depending on the flavor of MySQL in use.
// We define all the fields here and use only the ones we need for each flavor.
message Position {
MariadbGtid mariadb_gtid = 1;
MysqlGtidSet mysql_gtid_set = 2;
}
// Status is the replication status for MySQL (returned by 'show slave status'
// and parsed into a Position and fields).
message Status {
Position position = 1;
string position = 1;
bool slave_io_running = 2;
bool slave_sql_running = 3;
uint32 seconds_behind_master = 4;

Просмотреть файл

@ -81,7 +81,7 @@ message Permissions {
// BlpPosition is a replication position for a given binlog player
message BlpPosition {
uint32 uid = 1;
replicationdata.Position position = 2;
string position = 2;
}
//
@ -233,7 +233,7 @@ message MasterPositionRequest {
}
message MasterPositionResponse {
replicationdata.Position position = 1;
string position = 1;
}
message StopSlaveRequest {
@ -243,12 +243,12 @@ message StopSlaveResponse {
}
message StopSlaveMinimumRequest {
replicationdata.Position position = 1;
string position = 1;
int64 wait_timeout = 2;
}
message StopSlaveMinimumResponse {
replicationdata.Position position = 1;
string position = 1;
}
message StartSlaveRequest {
@ -307,7 +307,7 @@ message RunBlpUntilRequest {
}
message RunBlpUntilResponse {
replicationdata.Position position = 1;
string position = 1;
}
message ResetReplicationRequest {
@ -320,14 +320,14 @@ message InitMasterRequest {
}
message InitMasterResponse {
replicationdata.Position position = 1;
string position = 1;
}
message PopulateReparentJournalRequest {
int64 time_created_ns = 1;
string action_name = 2;
topodata.TabletAlias master_alias = 3;
replicationdata.Position replication_position = 4;
string replication_position = 4;
}
message PopulateReparentJournalResponse {
@ -335,7 +335,7 @@ message PopulateReparentJournalResponse {
message InitSlaveRequest {
topodata.TabletAlias parent = 1;
replicationdata.Position replication_position = 2;
string replication_position = 2;
int64 time_created_ns = 3;
}
@ -346,15 +346,15 @@ message DemoteMasterRequest {
}
message DemoteMasterResponse {
replicationdata.Position position = 1;
string position = 1;
}
message PromoteSlaveWhenCaughtUpRequest {
replicationdata.Position position = 1;
string position = 1;
}
message PromoteSlaveWhenCaughtUpResponse {
replicationdata.Position position = 1;
string position = 1;
}
message SlaveWasPromotedRequest {
@ -391,7 +391,7 @@ message PromoteSlaveRequest {
}
message PromoteSlaveResponse {
replicationdata.Position position = 1;
string position = 1;
}
// Backup / Restore related messages

Просмотреть файл

@ -19,201 +19,13 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='replicationdata.proto',
package='replicationdata',
syntax='proto3',
serialized_pb=_b('\n\x15replicationdata.proto\x12\x0freplicationdata\"?\n\x0bMariadbGtid\x12\x0e\n\x06\x64omain\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\r\x12\x10\n\x08sequence\x18\x03 \x01(\x04\"\xd7\x01\n\x0cMysqlGtidSet\x12<\n\x08uuid_set\x18\x01 \x03(\x0b\x32*.replicationdata.MysqlGtidSet.MysqlUuidSet\x1a,\n\rMysqlInterval\x12\r\n\x05\x66irst\x18\x01 \x01(\x04\x12\x0c\n\x04last\x18\x02 \x01(\x04\x1a[\n\x0cMysqlUuidSet\x12\x0c\n\x04uuid\x18\x01 \x01(\x0c\x12=\n\x08interval\x18\x02 \x03(\x0b\x32+.replicationdata.MysqlGtidSet.MysqlInterval\"u\n\x08Position\x12\x32\n\x0cmariadb_gtid\x18\x01 \x01(\x0b\x32\x1c.replicationdata.MariadbGtid\x12\x35\n\x0emysql_gtid_set\x18\x02 \x01(\x0b\x32\x1d.replicationdata.MysqlGtidSet\"\xd1\x01\n\x06Status\x12+\n\x08position\x18\x01 \x01(\x0b\x32\x19.replicationdata.Position\x12\x18\n\x10slave_io_running\x18\x02 \x01(\x08\x12\x19\n\x11slave_sql_running\x18\x03 \x01(\x08\x12\x1d\n\x15seconds_behind_master\x18\x04 \x01(\r\x12\x13\n\x0bmaster_host\x18\x05 \x01(\t\x12\x13\n\x0bmaster_port\x18\x06 \x01(\x05\x12\x1c\n\x14master_connect_retry\x18\x07 \x01(\x05\x62\x06proto3')
serialized_pb=_b('\n\x15replicationdata.proto\x12\x0freplicationdata\"\xb6\x01\n\x06Status\x12\x10\n\x08position\x18\x01 \x01(\t\x12\x18\n\x10slave_io_running\x18\x02 \x01(\x08\x12\x19\n\x11slave_sql_running\x18\x03 \x01(\x08\x12\x1d\n\x15seconds_behind_master\x18\x04 \x01(\r\x12\x13\n\x0bmaster_host\x18\x05 \x01(\t\x12\x13\n\x0bmaster_port\x18\x06 \x01(\x05\x12\x1c\n\x14master_connect_retry\x18\x07 \x01(\x05\x62\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_MARIADBGTID = _descriptor.Descriptor(
name='MariadbGtid',
full_name='replicationdata.MariadbGtid',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='domain', full_name='replicationdata.MariadbGtid.domain', index=0,
number=1, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='server', full_name='replicationdata.MariadbGtid.server', index=1,
number=2, type=13, cpp_type=3, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='sequence', full_name='replicationdata.MariadbGtid.sequence', index=2,
number=3, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=42,
serialized_end=105,
)
_MYSQLGTIDSET_MYSQLINTERVAL = _descriptor.Descriptor(
name='MysqlInterval',
full_name='replicationdata.MysqlGtidSet.MysqlInterval',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='first', full_name='replicationdata.MysqlGtidSet.MysqlInterval.first', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='last', full_name='replicationdata.MysqlGtidSet.MysqlInterval.last', index=1,
number=2, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=186,
serialized_end=230,
)
_MYSQLGTIDSET_MYSQLUUIDSET = _descriptor.Descriptor(
name='MysqlUuidSet',
full_name='replicationdata.MysqlGtidSet.MysqlUuidSet',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='uuid', full_name='replicationdata.MysqlGtidSet.MysqlUuidSet.uuid', index=0,
number=1, type=12, cpp_type=9, label=1,
has_default_value=False, default_value=_b(""),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='interval', full_name='replicationdata.MysqlGtidSet.MysqlUuidSet.interval', index=1,
number=2, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=232,
serialized_end=323,
)
_MYSQLGTIDSET = _descriptor.Descriptor(
name='MysqlGtidSet',
full_name='replicationdata.MysqlGtidSet',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='uuid_set', full_name='replicationdata.MysqlGtidSet.uuid_set', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[_MYSQLGTIDSET_MYSQLINTERVAL, _MYSQLGTIDSET_MYSQLUUIDSET, ],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=108,
serialized_end=323,
)
_POSITION = _descriptor.Descriptor(
name='Position',
full_name='replicationdata.Position',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='mariadb_gtid', full_name='replicationdata.Position.mariadb_gtid', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='mysql_gtid_set', full_name='replicationdata.Position.mysql_gtid_set', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=325,
serialized_end=442,
)
_STATUS = _descriptor.Descriptor(
name='Status',
full_name='replicationdata.Status',
@ -223,8 +35,8 @@ _STATUS = _descriptor.Descriptor(
fields=[
_descriptor.FieldDescriptor(
name='position', full_name='replicationdata.Status.position', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
@ -282,59 +94,12 @@ _STATUS = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
serialized_start=445,
serialized_end=654,
serialized_start=43,
serialized_end=225,
)
_MYSQLGTIDSET_MYSQLINTERVAL.containing_type = _MYSQLGTIDSET
_MYSQLGTIDSET_MYSQLUUIDSET.fields_by_name['interval'].message_type = _MYSQLGTIDSET_MYSQLINTERVAL
_MYSQLGTIDSET_MYSQLUUIDSET.containing_type = _MYSQLGTIDSET
_MYSQLGTIDSET.fields_by_name['uuid_set'].message_type = _MYSQLGTIDSET_MYSQLUUIDSET
_POSITION.fields_by_name['mariadb_gtid'].message_type = _MARIADBGTID
_POSITION.fields_by_name['mysql_gtid_set'].message_type = _MYSQLGTIDSET
_STATUS.fields_by_name['position'].message_type = _POSITION
DESCRIPTOR.message_types_by_name['MariadbGtid'] = _MARIADBGTID
DESCRIPTOR.message_types_by_name['MysqlGtidSet'] = _MYSQLGTIDSET
DESCRIPTOR.message_types_by_name['Position'] = _POSITION
DESCRIPTOR.message_types_by_name['Status'] = _STATUS
MariadbGtid = _reflection.GeneratedProtocolMessageType('MariadbGtid', (_message.Message,), dict(
DESCRIPTOR = _MARIADBGTID,
__module__ = 'replicationdata_pb2'
# @@protoc_insertion_point(class_scope:replicationdata.MariadbGtid)
))
_sym_db.RegisterMessage(MariadbGtid)
MysqlGtidSet = _reflection.GeneratedProtocolMessageType('MysqlGtidSet', (_message.Message,), dict(
MysqlInterval = _reflection.GeneratedProtocolMessageType('MysqlInterval', (_message.Message,), dict(
DESCRIPTOR = _MYSQLGTIDSET_MYSQLINTERVAL,
__module__ = 'replicationdata_pb2'
# @@protoc_insertion_point(class_scope:replicationdata.MysqlGtidSet.MysqlInterval)
))
,
MysqlUuidSet = _reflection.GeneratedProtocolMessageType('MysqlUuidSet', (_message.Message,), dict(
DESCRIPTOR = _MYSQLGTIDSET_MYSQLUUIDSET,
__module__ = 'replicationdata_pb2'
# @@protoc_insertion_point(class_scope:replicationdata.MysqlGtidSet.MysqlUuidSet)
))
,
DESCRIPTOR = _MYSQLGTIDSET,
__module__ = 'replicationdata_pb2'
# @@protoc_insertion_point(class_scope:replicationdata.MysqlGtidSet)
))
_sym_db.RegisterMessage(MysqlGtidSet)
_sym_db.RegisterMessage(MysqlGtidSet.MysqlInterval)
_sym_db.RegisterMessage(MysqlGtidSet.MysqlUuidSet)
Position = _reflection.GeneratedProtocolMessageType('Position', (_message.Message,), dict(
DESCRIPTOR = _POSITION,
__module__ = 'replicationdata_pb2'
# @@protoc_insertion_point(class_scope:replicationdata.Position)
))
_sym_db.RegisterMessage(Position)
Status = _reflection.GeneratedProtocolMessageType('Status', (_message.Message,), dict(
DESCRIPTOR = _STATUS,
__module__ = 'replicationdata_pb2'

Различия файлов скрыты, потому что одна или несколько строк слишком длинны