add _vt.reparent_log to handle reparent tracking per shard

consolidate some old tables for replication
This commit is contained in:
Mike Solomon 2012-10-30 23:49:00 -07:00
Родитель 3a05110930
Коммит 8c9c4f5ae5
6 изменённых файлов: 46 добавлений и 15 удалений

Просмотреть файл

@ -1,3 +1,11 @@
CREATE DATABASE _vt;
CREATE TABLE _vt.replication_test (time_created_ns bigint primary key);
CREATE TABLE _vt.replication_log (time_created_ns bigint primary key, note varchar(255));
CREATE TABLE _vt.replication_log (
time_created_ns bigint primary key,
note varchar(255));
CREATE TABLE _vt.reparent_log (
time_created_ns bigint primary key,
last_position varchar(255),
new_position varchar(255),
index (last_position));

Просмотреть файл

@ -35,7 +35,7 @@ const (
func (mysqld *Mysqld) ValidateCloneSource() error {
slaveStatus, err := mysqld.slaveStatus()
if err != nil {
if err != ERR_NOT_SLAVE {
if err != ErrNotSlave {
return fmt.Errorf("mysqlctl: ValidateCloneSource failed, %v", err)
}
} else {
@ -182,7 +182,7 @@ func (mysqld *Mysqld) CreateSnapshot(dbName, sourceAddr string, allowHierarchica
slaveStatus, slaveErr := mysqld.slaveStatus()
if slaveErr == nil {
slaveStartRequired = (slaveStatus["Slave_IO_Running"] == "Yes" && slaveStatus["Slave_SQL_Running"] == "Yes")
} else if slaveErr == ERR_NOT_SLAVE {
} else if slaveErr == ErrNotSlave {
sourceIsMaster = true
} else {
// If we can't get any data, just fail.

Просмотреть файл

@ -30,10 +30,24 @@ func (mysqld *Mysqld) DemoteMaster() (*ReplicationPosition, error) {
//
// replicationState: info slaves need to reparent themselves
// waitPosition: slaves can wait for this position when restarting replication
// timePromoted: this timestamp (unix nanoseconds) is inserted into _vt.replication_test to verify the replication config
// timePromoted: this timestamp (unix nanoseconds) is inserted into _vt.replication_log to verify the replication config
func (mysqld *Mysqld) PromoteSlave(setReadWrite bool) (replicationState *ReplicationState, waitPosition *ReplicationPosition, timePromoted int64, err error) {
if err = mysqld.StopSlave(); err != nil {
return
}
// If we are forced, we have to get our status as a master, not a slave.
masterAddr, _ := mysqld.GetMasterAddr()
lastRepPos, err := mysqld.SlaveStatus()
if err == ErrNotSlave {
lastRepPos, err = mysqld.MasterStatus()
masterAddr = mysqld.Addr()
}
if err != nil {
return
}
cmds := []string{
"STOP SLAVE",
"RESET MASTER",
"RESET SLAVE",
}
@ -46,10 +60,15 @@ func (mysqld *Mysqld) PromoteSlave(setReadWrite bool) (replicationState *Replica
}
replicationState = NewReplicationState(mysqld.Addr(), mysqld.replParams.Uname, mysqld.replParams.Pass)
replicationState.ReplicationPosition = *replicationPosition
lastPos := masterAddr + "@" + lastRepPos.MapKey()
newPos := replicationState.MasterAddr() + "@" + replicationState.ReplicationPosition.MapKey()
timePromoted = time.Now().UnixNano()
// write a row to verify that replication is functioning
cmd := fmt.Sprintf("INSERT INTO _vt.replication_test (time_created_ns) VALUES (%v)", timePromoted)
if err = mysqld.executeSuperQuery(cmd); err != nil {
cmds = []string{
fmt.Sprintf("INSERT INTO _vt.replication_log (time_created_ns, note) VALUES (%v, 'reparent check')", timePromoted),
fmt.Sprintf("INSERT INTO _vt.reparent_log (time_created_ns, last_position, new_position) VALUES (%v, '%v', '%v')", timePromoted, lastPos, newPos),
}
if err = mysqld.executeSuperQueryList(cmds); err != nil {
return
}
// this is the wait-point for checking replication
@ -88,8 +107,8 @@ func (mysqld *Mysqld) RestartSlave(replicationState *ReplicationState, waitPosit
// Check for the magic row inserted under controlled reparenting.
func (mysqld *Mysqld) CheckReplication(timeCheck int64) error {
relog.Info("Check Slave")
checkQuery := fmt.Sprintf("SELECT * FROM _vt.replication_test WHERE time_created_ns = %v",
relog.Info("Check replication restarted")
checkQuery := fmt.Sprintf("SELECT * FROM _vt.replication_log WHERE time_created_ns = %v",
timeCheck)
rows, err := mysqld.fetchSuperQuery(checkQuery)
if err != nil {

Просмотреть файл

@ -49,6 +49,10 @@ type ReplicationState struct {
MasterConnectRetry int
}
func (rs ReplicationState) MasterAddr() string {
return fmt.Sprintf("%v:%v", rs.MasterHost, rs.MasterPort)
}
func NewReplicationState(masterAddr, user, passwd string) *ReplicationState {
addrPieces := strings.Split(masterAddr, ":")
port, err := strconv.Atoi(addrPieces[1])
@ -216,7 +220,7 @@ func (mysqld *Mysqld) SetReadOnly(on bool) error {
return mysqld.executeSuperQuery(query)
}
var ERR_NOT_SLAVE = errors.New("no slave status")
var ErrNotSlave = errors.New("no slave status")
func (mysqld *Mysqld) slaveStatus() (map[string]string, error) {
rows, err := mysqld.fetchSuperQuery("SHOW SLAVE STATUS")
@ -224,7 +228,7 @@ func (mysqld *Mysqld) slaveStatus() (map[string]string, error) {
return nil, err
}
if len(rows) != 1 {
return nil, ERR_NOT_SLAVE
return nil, ErrNotSlave
}
rowMap := make(map[string]string)

Просмотреть файл

@ -208,7 +208,7 @@ func (mysqld *Mysqld) CreateSplitReplicaSource(dbName, keyName string, startKey,
masterAddr := ""
replicationPosition, statusErr := mysqld.SlaveStatus()
if statusErr != nil {
if statusErr != ERR_NOT_SLAVE {
if statusErr != ErrNotSlave {
// this is a real error
return nil, statusErr
}

Просмотреть файл

@ -37,7 +37,7 @@ On X: (Promote Slave)
RESET SLAVE;
SHOW MASTER STATUS;
replication file,position
INSERT INTO _vt.replication_test (time_created_ns) VALUES (<time>);
INSERT INTO _vt.replication_log (time_created_ns, 'reparent check') VALUES (<time>);
SHOW MASTER STATUS;
wait file,position
SET GLOBAL READ_ONLY=0;
@ -54,7 +54,7 @@ For all slaves in majority N:
CHANGE MASTER TO X;
START SLAVE;
SELECT MASTER_POS_WAIT(file, pos, deadline)
SELECT time_created FROM _vt.replication_test WHERE time_created_ns = <time>;
SELECT time_created FROM _vt.replication_log WHERE time_created_ns = <time>;
if no connection to N is available, ???