зеркало из https://github.com/github/vitess-gh.git
Add event.DispatchUpdate() to update and dispatch an event in one go.
This commit is contained in:
Родитель
f8163a782f
Коммит
9454c28ea1
|
@ -140,3 +140,17 @@ func callListeners(t reflect.Type, vals []reflect.Value) {
|
|||
reflect.ValueOf(fn).Call(vals)
|
||||
}
|
||||
}
|
||||
|
||||
// Updater is an interface that events can implement to combine updating and
|
||||
// dispatching into one call.
|
||||
type Updater interface {
|
||||
// Update is called by DispatchUpdate() before the event is dispatched.
|
||||
Update(update interface{})
|
||||
}
|
||||
|
||||
// DispatchUpdate calls Update() on the event and then dispatches it. This is a
|
||||
// shortcut for combining updates and dispatches into a single call.
|
||||
func DispatchUpdate(ev Updater, update interface{}) {
|
||||
ev.Update(update)
|
||||
Dispatch(ev)
|
||||
}
|
||||
|
|
|
@ -210,3 +210,31 @@ func TestDispatchValueToPointerInterfaceListener(t *testing.T) {
|
|||
})
|
||||
Dispatch(testEvent2{})
|
||||
}
|
||||
|
||||
type testUpdateEvent struct {
|
||||
update interface{}
|
||||
}
|
||||
|
||||
func (ev *testUpdateEvent) Update(update interface{}) {
|
||||
ev.update = update
|
||||
}
|
||||
|
||||
func TestDispatchUpdate(t *testing.T) {
|
||||
clearListeners()
|
||||
|
||||
triggered := false
|
||||
AddListener(func(*testUpdateEvent) {
|
||||
triggered = true
|
||||
})
|
||||
|
||||
ev := &testUpdateEvent{}
|
||||
DispatchUpdate(ev, "hello")
|
||||
|
||||
if !triggered {
|
||||
t.Errorf("listener failed to trigger on DispatchUpdate()")
|
||||
}
|
||||
want := "hello"
|
||||
if got := ev.update.(string); got != want {
|
||||
t.Errorf("ev.update = %#v, want %#v", got, want)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package events defines common structures used for events dispatched from
|
||||
// various other package.
|
||||
package events
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// StatusUpdater is a base struct for multi-part events with a status string
|
||||
// that gets updated as the process progresses. StatusUpdater implements
|
||||
// event.Updater, so if you embed a StatusUpdater into an event type, you can
|
||||
// set a new status and dispatch that event in one call with DispatchUpdate.
|
||||
//
|
||||
// For example:
|
||||
//
|
||||
// type MyEvent struct {
|
||||
// StatusUpdater
|
||||
// }
|
||||
// ev := &MyEvent{}
|
||||
// event.DispatchUpdate(ev, "new status")
|
||||
type StatusUpdater struct {
|
||||
Status string
|
||||
|
||||
// EventID is used to group the steps of a multi-part event.
|
||||
// It is set internally the first time Update() is called.
|
||||
EventID int64
|
||||
}
|
||||
|
||||
// Update sets a new status and initializes the EventID if necessary.
|
||||
// This implements event.Updater.Update().
|
||||
func (su *StatusUpdater) Update(status interface{}) {
|
||||
su.Status = status.(string)
|
||||
|
||||
// initialize event ID
|
||||
if su.EventID == 0 {
|
||||
su.EventID = time.Now().UnixNano()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,58 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/youtube/vitess/go/event"
|
||||
)
|
||||
|
||||
type testEvent struct {
|
||||
StatusUpdater
|
||||
}
|
||||
|
||||
func TestUpdateInit(t *testing.T) {
|
||||
want := "status"
|
||||
ev := &testEvent{}
|
||||
ev.Update("status")
|
||||
|
||||
if ev.Status != want {
|
||||
t.Errorf("ev.Status = %#v, want %#v", ev.Status, want)
|
||||
}
|
||||
if ev.EventID == 0 {
|
||||
t.Errorf("ev.EventID wasn't initialized")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateEventID(t *testing.T) {
|
||||
want := int64(12345)
|
||||
ev := &testEvent{}
|
||||
ev.EventID = 12345
|
||||
|
||||
ev.Update("status")
|
||||
|
||||
if ev.EventID != want {
|
||||
t.Errorf("ev.EventID = %v, want %v", ev.EventID, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDispatch(t *testing.T) {
|
||||
triggered := false
|
||||
event.AddListener(func(ev *testEvent) {
|
||||
triggered = true
|
||||
})
|
||||
|
||||
want := "status"
|
||||
ev := &testEvent{}
|
||||
event.DispatchUpdate(ev, "status")
|
||||
|
||||
if ev.Status != want {
|
||||
t.Errorf("ev.Status = %#v, want %#v", ev.Status, want)
|
||||
}
|
||||
if !triggered {
|
||||
t.Errorf("listener wasn't triggered on Dispatch()")
|
||||
}
|
||||
}
|
|
@ -1,37 +1,20 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package events defines the structures used for events dispatched from the
|
||||
// wrangler package.
|
||||
package events
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/event"
|
||||
base "github.com/youtube/vitess/go/vt/events"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
)
|
||||
|
||||
// Reparent is an event that describes a single step in the reparent process.
|
||||
type Reparent struct {
|
||||
ShardInfo topo.ShardInfo
|
||||
base.StatusUpdater
|
||||
|
||||
ShardInfo topo.ShardInfo
|
||||
OldMaster, NewMaster topo.Tablet
|
||||
|
||||
Status string
|
||||
|
||||
// eventID is used to group the steps of a single reparent in progress.
|
||||
// It is set internally the first time UpdateStatus() is called.
|
||||
eventID int64
|
||||
}
|
||||
|
||||
// UpdateStatus sets a new status and then dispatches the event.
|
||||
func (r *Reparent) UpdateStatus(status string) {
|
||||
r.Status = status
|
||||
|
||||
// initialize event ID
|
||||
if r.eventID == 0 {
|
||||
r.eventID = time.Now().UnixNano()
|
||||
}
|
||||
|
||||
// Dispatch must be synchronous here to avoid dropping events that are
|
||||
// queued up just before main() returns.
|
||||
event.Dispatch(r)
|
||||
}
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
|
|
|
@ -1,9 +1,14 @@
|
|||
// Copyright 2014, Google Inc. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package events
|
||||
|
||||
import (
|
||||
"log/syslog"
|
||||
"testing"
|
||||
|
||||
base "github.com/youtube/vitess/go/vt/events"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
)
|
||||
|
||||
|
@ -23,7 +28,7 @@ func TestReparentSyslog(t *testing.T) {
|
|||
Uid: 54321,
|
||||
},
|
||||
},
|
||||
Status: "status",
|
||||
StatusUpdater: base.StatusUpdater{Status: "status"},
|
||||
}
|
||||
gotSev, gotMsg := tc.Syslog()
|
||||
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
package events
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestUpdateStatus(t *testing.T) {
|
||||
r := &Reparent{
|
||||
Status: "status1",
|
||||
}
|
||||
|
||||
r.UpdateStatus("status2")
|
||||
|
||||
if r.Status != "status2" {
|
||||
t.Errorf("got %v, want status2", r.Status)
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ package wrangler
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/youtube/vitess/go/event"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topotools"
|
||||
"github.com/youtube/vitess/go/vt/wrangler/events"
|
||||
|
@ -17,11 +18,11 @@ import (
|
|||
// The ev parameter is an event struct prefilled with information that the
|
||||
// caller has on hand, which would be expensive for us to re-query.
|
||||
func (wr *Wrangler) reparentShardBrutal(ev *events.Reparent, si *topo.ShardInfo, slaveTabletMap, masterTabletMap map[topo.TabletAlias]*topo.TabletInfo, masterElectTablet *topo.TabletInfo, leaveMasterReadOnly, force bool) (err error) {
|
||||
ev.UpdateStatus("starting brutal")
|
||||
event.DispatchUpdate(ev, "starting brutal")
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
ev.UpdateStatus("failed: " + err.Error())
|
||||
event.DispatchUpdate(ev, "failed: "+err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -35,18 +36,18 @@ func (wr *Wrangler) reparentShardBrutal(ev *events.Reparent, si *topo.ShardInfo,
|
|||
// has not been forced.
|
||||
if !force {
|
||||
// Make sure all tablets have the right parent and reasonable positions.
|
||||
ev.UpdateStatus("checking slave replication positions")
|
||||
event.DispatchUpdate(ev, "checking slave replication positions")
|
||||
if err := wr.checkSlaveReplication(slaveTabletMap, topo.NO_TABLET); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the master-elect is fit for duty - call out for hardware checks.
|
||||
ev.UpdateStatus("checking that new master is ready to serve")
|
||||
event.DispatchUpdate(ev, "checking that new master is ready to serve")
|
||||
if err := wr.checkMasterElect(masterElectTablet); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ev.UpdateStatus("checking slave consistency")
|
||||
event.DispatchUpdate(ev, "checking slave consistency")
|
||||
wr.logger.Infof("check slaves %v/%v", masterElectTablet.Keyspace, masterElectTablet.Shard)
|
||||
restartableSlaveTabletMap := wr.restartableTabletMap(slaveTabletMap)
|
||||
err = wr.checkSlaveConsistency(restartableSlaveTabletMap, nil)
|
||||
|
@ -54,7 +55,7 @@ func (wr *Wrangler) reparentShardBrutal(ev *events.Reparent, si *topo.ShardInfo,
|
|||
return err
|
||||
}
|
||||
} else {
|
||||
ev.UpdateStatus("stopping slave replication")
|
||||
event.DispatchUpdate(ev, "stopping slave replication")
|
||||
wr.logger.Infof("forcing reparent to same master %v", masterElectTablet.Alias)
|
||||
err := wr.breakReplication(slaveTabletMap, masterElectTablet)
|
||||
if err != nil {
|
||||
|
@ -62,7 +63,7 @@ func (wr *Wrangler) reparentShardBrutal(ev *events.Reparent, si *topo.ShardInfo,
|
|||
}
|
||||
}
|
||||
|
||||
ev.UpdateStatus("promoting new master")
|
||||
event.DispatchUpdate(ev, "promoting new master")
|
||||
rsd, err := wr.promoteSlave(masterElectTablet)
|
||||
if err != nil {
|
||||
// FIXME(msolomon) This suggests that the master-elect is dead.
|
||||
|
@ -74,12 +75,12 @@ func (wr *Wrangler) reparentShardBrutal(ev *events.Reparent, si *topo.ShardInfo,
|
|||
delete(slaveTabletMap, masterElectTablet.Alias)
|
||||
delete(masterTabletMap, masterElectTablet.Alias)
|
||||
|
||||
ev.UpdateStatus("restarting slaves")
|
||||
event.DispatchUpdate(ev, "restarting slaves")
|
||||
majorityRestart, restartSlaveErr := wr.restartSlaves(slaveTabletMap, rsd)
|
||||
|
||||
if !force {
|
||||
for _, failedMaster := range masterTabletMap {
|
||||
ev.UpdateStatus("scrapping old master")
|
||||
event.DispatchUpdate(ev, "scrapping old master")
|
||||
wr.logger.Infof("scrap dead master %v", failedMaster.Alias)
|
||||
// The master is dead so execute the action locally instead of
|
||||
// enqueing the scrap action for an arbitrary amount of time.
|
||||
|
@ -89,13 +90,13 @@ func (wr *Wrangler) reparentShardBrutal(ev *events.Reparent, si *topo.ShardInfo,
|
|||
}
|
||||
}
|
||||
|
||||
ev.UpdateStatus("rebuilding shard serving graph")
|
||||
event.DispatchUpdate(ev, "rebuilding shard serving graph")
|
||||
err = wr.finishReparent(si, masterElectTablet, majorityRestart, leaveMasterReadOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ev.UpdateStatus("finished")
|
||||
event.DispatchUpdate(ev, "finished")
|
||||
|
||||
if restartSlaveErr != nil {
|
||||
// This is more of a warning at this point.
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/youtube/vitess/go/event"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/actionnode"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/topotools"
|
||||
|
@ -79,7 +80,7 @@ func (wr *Wrangler) shardExternallyReparentedLocked(keyspace, shard string, mast
|
|||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
ev.UpdateStatus("failed: " + err.Error())
|
||||
event.DispatchUpdate(ev, "failed: "+err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -99,7 +100,7 @@ func (wr *Wrangler) shardExternallyReparentedLocked(keyspace, shard string, mast
|
|||
}
|
||||
|
||||
// now update the master record in the shard object
|
||||
ev.UpdateStatus("updating shard record")
|
||||
event.DispatchUpdate(ev, "updating shard record")
|
||||
wr.logger.Infof("Updating Shard's MasterAlias record")
|
||||
shardInfo.MasterAlias = masterElectTabletAlias
|
||||
if err = wr.ts.UpdateShard(shardInfo); err != nil {
|
||||
|
@ -107,13 +108,13 @@ func (wr *Wrangler) shardExternallyReparentedLocked(keyspace, shard string, mast
|
|||
}
|
||||
|
||||
// and rebuild the shard serving graph
|
||||
ev.UpdateStatus("rebuilding shard serving graph")
|
||||
event.DispatchUpdate(ev, "rebuilding shard serving graph")
|
||||
wr.logger.Infof("Rebuilding shard serving graph data")
|
||||
if err = topotools.RebuildShard(wr.logger, wr.ts, masterElectTablet.Keyspace, masterElectTablet.Shard, cells, wr.lockTimeout, interrupted); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ev.UpdateStatus("finished")
|
||||
event.DispatchUpdate(ev, "finished")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -122,10 +123,10 @@ func (wr *Wrangler) shardExternallyReparentedLocked(keyspace, shard string, mast
|
|||
// The ev parameter is an event struct prefilled with information that the
|
||||
// caller has on hand, which would be expensive for us to re-query.
|
||||
func (wr *Wrangler) reparentShardExternal(ev *events.Reparent, slaveTabletMap, masterTabletMap map[topo.TabletAlias]*topo.TabletInfo, masterElectTablet *topo.TabletInfo) error {
|
||||
ev.UpdateStatus("starting external")
|
||||
event.DispatchUpdate(ev, "starting external")
|
||||
|
||||
// we fix the new master in the replication graph
|
||||
ev.UpdateStatus("checking if new master was promoted")
|
||||
event.DispatchUpdate(ev, "checking if new master was promoted")
|
||||
err := wr.slaveWasPromoted(masterElectTablet)
|
||||
if err != nil {
|
||||
// This suggests that the master-elect is dead. This is bad.
|
||||
|
@ -137,14 +138,14 @@ func (wr *Wrangler) reparentShardExternal(ev *events.Reparent, slaveTabletMap, m
|
|||
delete(masterTabletMap, masterElectTablet.Alias)
|
||||
|
||||
// Re-read the master elect tablet, as its mysql port may have been updated
|
||||
ev.UpdateStatus("re-reading new master record")
|
||||
event.DispatchUpdate(ev, "re-reading new master record")
|
||||
masterElectTablet, err = wr.TopoServer().GetTablet(masterElectTablet.Alias)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot re-read the master record, something is seriously wrong: %v", err)
|
||||
}
|
||||
|
||||
// then fix all the slaves, including the old master
|
||||
ev.UpdateStatus("restarting slaves")
|
||||
event.DispatchUpdate(ev, "restarting slaves")
|
||||
wr.restartSlavesExternal(slaveTabletMap, masterTabletMap, masterElectTablet)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/youtube/vitess/go/event"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/wrangler/events"
|
||||
)
|
||||
|
@ -16,11 +17,11 @@ import (
|
|||
// The ev parameter is an event struct prefilled with information that the
|
||||
// caller has on hand, which would be expensive for us to re-query.
|
||||
func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInfo, slaveTabletMap, masterTabletMap map[topo.TabletAlias]*topo.TabletInfo, masterElectTablet *topo.TabletInfo, leaveMasterReadOnly bool) (err error) {
|
||||
ev.UpdateStatus("starting graceful")
|
||||
event.DispatchUpdate(ev, "starting graceful")
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
ev.UpdateStatus("failed: " + err.Error())
|
||||
event.DispatchUpdate(ev, "failed: "+err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -58,20 +59,20 @@ func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInf
|
|||
}
|
||||
|
||||
// Make sure all tablets have the right parent and reasonable positions.
|
||||
ev.UpdateStatus("checking slave replication positions")
|
||||
event.DispatchUpdate(ev, "checking slave replication positions")
|
||||
err = wr.checkSlaveReplication(slaveTabletMap, masterTablet.Alias.Uid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check the master-elect is fit for duty - call out for hardware checks.
|
||||
ev.UpdateStatus("checking that new master is ready to serve")
|
||||
event.DispatchUpdate(ev, "checking that new master is ready to serve")
|
||||
err = wr.checkMasterElect(masterElectTablet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ev.UpdateStatus("demoting old master")
|
||||
event.DispatchUpdate(ev, "demoting old master")
|
||||
masterPosition, err := wr.demoteMaster(masterTablet)
|
||||
if err != nil {
|
||||
// FIXME(msolomon) This suggests that the master is dead and we
|
||||
|
@ -80,7 +81,7 @@ func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInf
|
|||
return fmt.Errorf("demote master failed: %v, if the master is dead, run: vtctl -force ScrapTablet %v", err, masterTablet.Alias)
|
||||
}
|
||||
|
||||
ev.UpdateStatus("checking slave consistency")
|
||||
event.DispatchUpdate(ev, "checking slave consistency")
|
||||
wr.logger.Infof("check slaves %v/%v", masterTablet.Keyspace, masterTablet.Shard)
|
||||
restartableSlaveTabletMap := wr.restartableTabletMap(slaveTabletMap)
|
||||
err = wr.checkSlaveConsistency(restartableSlaveTabletMap, masterPosition)
|
||||
|
@ -88,7 +89,7 @@ func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInf
|
|||
return fmt.Errorf("check slave consistency failed %v, demoted master is still read only, run: vtctl SetReadWrite %v", err, masterTablet.Alias)
|
||||
}
|
||||
|
||||
ev.UpdateStatus("promoting new master")
|
||||
event.DispatchUpdate(ev, "promoting new master")
|
||||
rsd, err := wr.promoteSlave(masterElectTablet)
|
||||
if err != nil {
|
||||
// FIXME(msolomon) This suggests that the master-elect is dead.
|
||||
|
@ -99,7 +100,7 @@ func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInf
|
|||
// Once the slave is promoted, remove it from our map
|
||||
delete(slaveTabletMap, masterElectTablet.Alias)
|
||||
|
||||
ev.UpdateStatus("restarting slaves")
|
||||
event.DispatchUpdate(ev, "restarting slaves")
|
||||
majorityRestart, restartSlaveErr := wr.restartSlaves(slaveTabletMap, rsd)
|
||||
|
||||
// For now, scrap the old master regardless of how many
|
||||
|
@ -107,7 +108,7 @@ func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInf
|
|||
//
|
||||
// FIXME(msolomon) We could reintroduce it and reparent it and use
|
||||
// it as new replica.
|
||||
ev.UpdateStatus("scrapping old master")
|
||||
event.DispatchUpdate(ev, "scrapping old master")
|
||||
wr.logger.Infof("scrap demoted master %v", masterTablet.Alias)
|
||||
scrapActionPath, scrapErr := wr.ai.Scrap(masterTablet.Alias)
|
||||
if scrapErr == nil {
|
||||
|
@ -118,13 +119,13 @@ func (wr *Wrangler) reparentShardGraceful(ev *events.Reparent, si *topo.ShardInf
|
|||
wr.logger.Warningf("scrap demoted master failed: %v", scrapErr)
|
||||
}
|
||||
|
||||
ev.UpdateStatus("rebuilding shard serving graph")
|
||||
event.DispatchUpdate(ev, "rebuilding shard serving graph")
|
||||
err = wr.finishReparent(si, masterElectTablet, majorityRestart, leaveMasterReadOnly)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ev.UpdateStatus("finished")
|
||||
event.DispatchUpdate(ev, "finished")
|
||||
|
||||
if restartSlaveErr != nil {
|
||||
// This is more of a warning at this point.
|
||||
|
|
Загрузка…
Ссылка в новой задаче