blp_checkpoint has no relay positions any more.

Also removing useCheckpoint flag in vt_binlog_player, it always does now.
This commit is contained in:
Alain Jobart 2013-08-01 13:58:20 -07:00
Родитель 715699e32b
Коммит 3e5984d9de
3 изменённых файлов: 22 добавлений и 77 удалений

Просмотреть файл

@ -18,8 +18,6 @@ CREATE TABLE _vt.blp_checkpoint (
port int NOT NULL,
master_filename varchar(255) NOT NULL,
master_position bigint(20) unsigned NOT NULL,
relay_filename varchar(255) default NULL,
relay_position bigint(20) unsigned default 0,
group_id varchar(255) default NULL,
keyrange_start varchar(32) NOT NULL,
keyrange_end varchar(32) NOT NULL,

Просмотреть файл

@ -47,7 +47,6 @@ var (
txnBatch = flag.Int("txn-batch", TXN_BATCH, "transaction batch size")
maxTxnInterval = flag.Int("max-txn-interval", MAX_TXN_INTERVAL, "max txn interval")
startPosFile = flag.String("start-pos-file", "", "server address and start coordinates")
useCheckpoint = flag.Bool("use-checkpoint", false, "use the saved checkpoint to start")
dbConfigFile = flag.String("db-config-file", "", "json file for db credentials")
lookupConfigFile = flag.String("lookup-config-file", "", "json file for lookup db credentials")
debug = flag.Bool("debug", true, "run a debug version - prints the sql statements rather than executing them")
@ -72,11 +71,9 @@ var (
SPACE = " "
USE_VT = "use _vt"
USE_DB = "use %v"
INSERT_INTO_RECOVERY = `insert into _vt.blp_checkpoint (uid, host, port, master_filename, master_position, relay_filename, relay_position, group_id, keyrange_start, keyrange_end, txn_timestamp, time_updated)
values (%v, '%v', %v, '%v', %v, '%v', %v, '%v', '%v', '%v', unix_timestamp(), %v)`
UPDATE_RECOVERY = "update _vt.blp_checkpoint set master_filename='%v', master_position=%v, relay_filename='%v', relay_position=%v, group_id='%v', txn_timestamp=unix_timestamp(), time_updated=%v where uid=%v"
UPDATE_PORT = "update _vt.blp_checkpoint set port=%v where uid=%v"
SELECT_FROM_RECOVERY = "select * from _vt.blp_checkpoint where uid=%v"
UPDATE_RECOVERY = "update _vt.blp_checkpoint set master_filename='%v', master_position=%v, group_id='%v', txn_timestamp=unix_timestamp(), time_updated=%v where uid=%v"
UPDATE_PORT = "update _vt.blp_checkpoint set port=%v where uid=%v"
SELECT_FROM_RECOVERY = "select * from _vt.blp_checkpoint where uid=%v"
)
/*
@ -279,10 +276,9 @@ func (blp *BinlogPlayer) updatePort(port int, uid uint32, useDb string) {
func (blp *BinlogPlayer) WriteRecoveryPosition(currentPosition *mysqlctl.ReplicationCoordinates, groupId string) {
blp.recoveryState.Position = *currentPosition
updateRecovery := fmt.Sprintf(UPDATE_RECOVERY, currentPosition.MasterFilename,
updateRecovery := fmt.Sprintf(UPDATE_RECOVERY,
currentPosition.MasterFilename,
currentPosition.MasterPosition,
currentPosition.RelayFilename,
currentPosition.RelayPosition,
groupId,
time.Now().Unix(),
blp.recoveryState.Uid)
@ -309,7 +305,7 @@ func main() {
relog.Fatal("Cannot start without db-config-file")
}
blp, err := initBinlogPlayer(*startPosFile, *dbConfigFile, *lookupConfigFile, *dbCredFile, *useCheckpoint, *debug, *port)
blp, err := initBinlogPlayer(*startPosFile, *dbConfigFile, *lookupConfigFile, *dbCredFile, *debug, *port)
if err != nil {
relog.Fatal("Error in initializing binlog player - '%v'", err)
}
@ -460,21 +456,6 @@ func getStartPosition(qr *proto.QueryResult) (*mysqlctl.ReplicationCoordinates,
}
startPosition.MasterPosition = masterPos
}
case "relay_filename":
val := row[i]
if !val.IsNull() {
startPosition.RelayFilename = val.String()
}
case "relay_position":
val := row[i]
if !val.IsNull() {
strVal := val.String()
relayPos, err := strconv.ParseUint(strVal, 0, 64)
if err != nil {
return nil, fmt.Errorf("Couldn't obtain correct value for '%v'", field.Name)
}
startPosition.RelayPosition = relayPos
}
default:
continue
}
@ -482,7 +463,7 @@ func getStartPosition(qr *proto.QueryResult) (*mysqlctl.ReplicationCoordinates,
return startPosition, nil
}
func initBinlogPlayer(startPosFile, dbConfigFile, lookupConfigFile, dbCredFile string, useCheckpoint, debug bool, port int) (*BinlogPlayer, error) {
func initBinlogPlayer(startPosFile, dbConfigFile, lookupConfigFile, dbCredFile string, debug bool, port int) (*BinlogPlayer, error) {
startData, err := ioutil.ReadFile(startPosFile)
if err != nil {
return nil, fmt.Errorf("Error %s in reading start position file %s", err, startPosFile)
@ -497,21 +478,20 @@ func initBinlogPlayer(startPosFile, dbConfigFile, lookupConfigFile, dbCredFile s
if err != nil {
return nil, err
}
if useCheckpoint {
selectRecovery := fmt.Sprintf(SELECT_FROM_RECOVERY, startPosition.Uid)
qr, err := dbClient.ExecuteFetch(selectRecovery, 1, true)
if err != nil {
panic(fmt.Errorf("Error %v in selecting from recovery table %v", err, selectRecovery))
}
if qr.RowsAffected != 1 {
relog.Fatal("Checkpoint information not available in db")
}
startCoord, err := getStartPosition(qr)
if err != nil {
relog.Fatal("Error in obtaining checkpoint information")
}
startPosition.Position = *startCoord
selectRecovery := fmt.Sprintf(SELECT_FROM_RECOVERY, startPosition.Uid)
qr, err := dbClient.ExecuteFetch(selectRecovery, 1, true)
if err != nil {
panic(fmt.Errorf("Error %v in selecting from recovery table %v", err, selectRecovery))
}
if qr.RowsAffected != 1 {
relog.Fatal("Checkpoint information not available in db")
}
startCoord, err := getStartPosition(qr)
if err != nil {
relog.Fatal("Error in obtaining checkpoint information")
}
startPosition.Position = *startCoord
if !startPositionValid(startPosition) {
return nil, fmt.Errorf("Invalid Start Position")
@ -541,10 +521,6 @@ func initBinlogPlayer(startPosFile, dbConfigFile, lookupConfigFile, dbCredFile s
}
binlogPlayer.lookupClient = *lookupClient
if !useCheckpoint {
initialize_recovery_table(dbClient, startPosition, port)
}
useDb := fmt.Sprintf(USE_DB, dbClient.dbConfig.Dbname)
binlogPlayer.updatePort(port, startPosition.Uid, useDb)
}
@ -552,34 +528,6 @@ func initBinlogPlayer(startPosFile, dbConfigFile, lookupConfigFile, dbCredFile s
return binlogPlayer, nil
}
func initialize_recovery_table(dbClient *DBClient, startPosition *binlogRecoveryState, port int) {
useDb := fmt.Sprintf(USE_DB, dbClient.dbConfig.Dbname)
selectRecovery := fmt.Sprintf(SELECT_FROM_RECOVERY, startPosition.Uid)
qr, err := dbClient.ExecuteFetch(selectRecovery, 1, true)
if err != nil {
panic(fmt.Errorf("Error %v in selecting from recovery table %v", err, selectRecovery))
}
if qr.RowsAffected == 0 {
insertRecovery := fmt.Sprintf(INSERT_INTO_RECOVERY, startPosition.Uid, startPosition.Host,
port,
startPosition.Position.MasterFilename,
startPosition.Position.MasterPosition,
startPosition.Position.RelayFilename,
startPosition.Position.RelayPosition,
"",
startPosition.KeyrangeStart,
startPosition.KeyrangeEnd,
time.Now().Unix())
recoveryDmls := []string{USE_VT, "begin", insertRecovery, "commit", useDb}
for _, sql := range recoveryDmls {
if _, err := dbClient.ExecuteFetch(sql, 0, false); err != nil {
panic(fmt.Errorf("Error %v in inserting into recovery table %v", err, sql))
}
}
}
}
func handleError(err *error, blp *BinlogPlayer) {
lastTxnPosition := blp.recoveryState.Position
if x := recover(); x != nil {

Просмотреть файл

@ -106,8 +106,8 @@ const (
partialSnapshotManifestFile = "partial_snapshot_manifest.json"
SnapshotURLPath = "/snapshot"
INSERT_INTO_RECOVERY = `insert into _vt.blp_checkpoint (uid, host, port, master_filename, master_position, relay_filename, relay_position, group_id, keyrange_start, keyrange_end, txn_timestamp, time_updated)
values (%v, '%v', %v, '%v', %v, '%v', %v, %v, '%v', '%v', unix_timestamp(), %v)`
INSERT_INTO_RECOVERY = `insert into _vt.blp_checkpoint (uid, host, port, master_filename, master_position, group_id, keyrange_start, keyrange_end, txn_timestamp, time_updated)
values (%v, '%v', %v, '%v', %v, '%v', '%v', '%v', unix_timestamp(), %v)`
)
// replaceError replaces original with recent if recent is not nil,
@ -902,7 +902,6 @@ func (mysqld *Mysqld) RestoreFromMultiSnapshot(destinationDbName string, keyRang
port,
manifest.Source.MasterState.ReplicationPosition.MasterLogFile,
manifest.Source.MasterState.ReplicationPosition.MasterLogPosition,
"", 0,
manifest.Source.MasterState.ReplicationPosition.MasterLogGroupId,
keyRange.Start.Hex(),
keyRange.End.Hex(),