diff --git a/go/mysql/filepos_gtid.go b/go/mysql/filepos_gtid.go index 07606a4fc6..d707e74ad5 100644 --- a/go/mysql/filepos_gtid.go +++ b/go/mysql/filepos_gtid.go @@ -146,6 +146,10 @@ func (gtid filePosGTID) Union(other GTIDSet) GTIDSet { return filePosOther } +func (gtid filePosGTID) Last() string { + panic("not implemented") +} + func init() { gtidParsers[filePosFlavorID] = parseFilePosGTID gtidSetParsers[filePosFlavorID] = parseFilePosGTIDSet diff --git a/go/mysql/gtid_set.go b/go/mysql/gtid_set.go index b966e8bc2a..812b7f33ca 100644 --- a/go/mysql/gtid_set.go +++ b/go/mysql/gtid_set.go @@ -49,6 +49,9 @@ type GTIDSet interface { // Union returns a union of the receiver GTIDSet and the supplied GTIDSet. Union(GTIDSet) GTIDSet + + // Union returns a union of the receiver GTIDSet and the supplied GTIDSet. + Last() string } // gtidSetParsers maps flavor names to parser functions. It is used by diff --git a/go/mysql/gtid_test.go b/go/mysql/gtid_test.go index bad7daabc8..67ac707224 100644 --- a/go/mysql/gtid_test.go +++ b/go/mysql/gtid_test.go @@ -193,6 +193,7 @@ type fakeGTID struct { } func (f fakeGTID) String() string { return f.value } +func (f fakeGTID) Last() string { panic("not implemented") } func (f fakeGTID) Flavor() string { return f.flavor } func (fakeGTID) SourceServer() interface{} { return int(1) } func (fakeGTID) SequenceNumber() interface{} { return int(1) } diff --git a/go/mysql/mariadb_gtid.go b/go/mysql/mariadb_gtid.go index 57cca52bc7..9f320bbebf 100644 --- a/go/mysql/mariadb_gtid.go +++ b/go/mysql/mariadb_gtid.go @@ -232,6 +232,11 @@ func (gtidSet MariadbGTIDSet) Union(other GTIDSet) GTIDSet { return newSet } +//Last returns the last gtid +func (gtidSet MariadbGTIDSet) Last() string { + panic("not implemented") +} + // deepCopy returns a deep copy of the set. func (gtidSet MariadbGTIDSet) deepCopy() MariadbGTIDSet { newSet := make(MariadbGTIDSet, len(gtidSet)) diff --git a/go/mysql/mysql56_gtid_set.go b/go/mysql/mysql56_gtid_set.go index 97ccb820b3..e2b6b3e4ef 100644 --- a/go/mysql/mysql56_gtid_set.go +++ b/go/mysql/mysql56_gtid_set.go @@ -171,6 +171,23 @@ func (set Mysql56GTIDSet) String() string { return buf.String() } +//Last returns the last gtid +func (set Mysql56GTIDSet) Last() string { + buf := &bytes.Buffer{} + + if len(set.SIDs()) > 0 { + sid := set.SIDs()[len(set.SIDs())-1] + buf.WriteString(sid.String()) + for _, interval := range set[sid] { + buf.WriteByte(':') + buf.WriteString(strconv.FormatInt(interval.end, 10)) + } + + } + + return buf.String() +} + // Flavor implements GTIDSet. func (Mysql56GTIDSet) Flavor() string { return mysql56FlavorID } diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go index 4d4f66a1c8..40051d9e6c 100644 --- a/go/vt/vttablet/tabletmanager/restore.go +++ b/go/vt/vttablet/tabletmanager/restore.go @@ -19,7 +19,6 @@ package tabletmanager import ( "flag" "fmt" - "strings" "time" "vitess.io/vitess/go/vt/proto/vttime" @@ -257,8 +256,10 @@ func (agent *ActionAgent) getGTIDFromTimestamp(ctx context.Context, pos mysql.Po select { case <-found: + vsClient.Close(timeoutCtx) return <-found case <-timeoutCtx.Done(): + vsClient.Close(timeoutCtx) log.Warningf("Can't find the GTID from restore time stamp, exiting.") return "" } @@ -270,24 +271,22 @@ func (agent *ActionAgent) catchupToGTID(ctx context.Context, gtid string) error if err != nil { return err } - gtidStr := gtidParsed.GTIDSet.String() + gtidStr := gtidParsed.GTIDSet.Last() + log.Infof("gtid to restore upto %s", gtidStr) - gtidNew := strings.Split(gtidStr, ":")[0] + ":" + strings.Split(strings.Split(gtidStr, ":")[1], "-")[1] + //gtidNew := strings.Split(gtidStr, ":")[0] + ":" + strings.Split(strings.Split(gtidStr, ":")[1], "-")[1] // TODO: we can use agent.MysqlDaemon.SetMaster , but it uses replDbConfig cmds := []string{ "STOP SLAVE FOR CHANNEL '' ", "STOP SLAVE IO_THREAD FOR CHANNEL ''", fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s',MASTER_PORT=%d, MASTER_USER='%s', MASTER_AUTO_POSITION = 1;", *binlogHost, *binlogPort, *binlogUser), - fmt.Sprintf(" START SLAVE UNTIL SQL_BEFORE_GTIDS = '%s'", gtidNew), - "STOP SLAVE", - "RESET SLAVE ALL", + fmt.Sprintf(" START SLAVE UNTIL SQL_BEFORE_GTIDS = '%s'", gtidStr), } fmt.Printf("%v", cmds) if err := agent.MysqlDaemon.ExecuteSuperQueryList(ctx, cmds); err != nil { return vterrors.Wrap(err, "failed to reset slave") } - println("it should have cought up") // TODO: Wait for the replication to happen and then reset the slave, so that we don't be connected to binlog server return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 3133f7d961..f7c2db50ac 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -78,6 +78,7 @@ func (c *replicaConnector) Open(ctx context.Context) error { } func (c *replicaConnector) Close(ctx context.Context) error { + c.shutdown() return nil }