зеркало из https://github.com/github/vitess-gh.git
Allow to wait for all tablets to be propagated to TabletStatsCache (#2144)
This change adds method TopologyWatcher.WaitForInitialTopology() that will wait until the initial topology data is read and propagated to TabletRecorder through AddTablet() method. This also adds HealthCheck.WaitForInitialStatsUpdates() method which will wait until information about all tablets added via AddTablet() method is propagated to the listener through StatsUpdate() method. This will allow resharding and schema swap code to be sure that after they created the topology watchers all tablets has been propagated to TabletStatsCache (or other health check listener). The schema swap code is modified here to use these new methods. This code is not tested (because schema swap code is not fully ready yet), but serves as an example of how these new methods can be used.
This commit is contained in:
Родитель
890a717db0
Коммит
a16674a81f
|
@ -50,6 +50,10 @@ func (fhc *FakeHealthCheck) SetListener(listener HealthCheckStatsListener, sendD
|
|||
fhc.listener = listener
|
||||
}
|
||||
|
||||
// WaitForInitialStatsUpdates is not implemented.
|
||||
func (fhc *FakeHealthCheck) WaitForInitialStatsUpdates() {
|
||||
}
|
||||
|
||||
// AddTablet adds the tablet and calls the listener.
|
||||
func (fhc *FakeHealthCheck) AddTablet(tablet *topodatapb.Tablet, name string) {
|
||||
key := TabletToMapKey(tablet)
|
||||
|
|
|
@ -170,6 +170,15 @@ type HealthCheck interface {
|
|||
// Note that the default implementation requires to set the
|
||||
// listener before any tablets are added to the healthcheck.
|
||||
SetListener(listener HealthCheckStatsListener, sendDownEvents bool)
|
||||
// WaitForInitialStatsUpdates waits until all tablets added via
|
||||
// AddTablet() call were propagated to the listener via corresponding
|
||||
// StatsUpdate() calls. Note that code path from AddTablet() to
|
||||
// corresponding StatsUpdate() is asynchronous but not cancelable, thus
|
||||
// this function is also non-cancelable and can't return error. Also
|
||||
// note that all AddTablet() calls should happen before calling this
|
||||
// method. WaitForInitialStatsUpdates won't wait for StatsUpdate() calls
|
||||
// corresponding to AddTablet() calls made during its execution.
|
||||
WaitForInitialStatsUpdates()
|
||||
// GetConnection returns the TabletConn of the given tablet.
|
||||
GetConnection(key string) tabletconn.TabletConn
|
||||
// CacheStatus returns a displayable version of the cache.
|
||||
|
@ -215,6 +224,9 @@ type HealthCheckImpl struct {
|
|||
|
||||
// addrToConns maps from address to the healthCheckConn object.
|
||||
addrToConns map[string]*healthCheckConn
|
||||
|
||||
// Wait group that's used to wait until all initial StatsUpdate() calls are made after the AddTablet() calls.
|
||||
initialUpdatesWG sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewHealthCheck creates a new HealthCheck object.
|
||||
|
@ -310,6 +322,7 @@ func (hc *HealthCheckImpl) checkConn(hcc *healthCheckConn, name string) {
|
|||
if hc.listener != nil {
|
||||
hc.listener.StatsUpdate(&ts)
|
||||
}
|
||||
hc.initialUpdatesWG.Done()
|
||||
|
||||
// retry health check if it fails
|
||||
for {
|
||||
|
@ -564,6 +577,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodatapb.Tablet, name string) {
|
|||
return
|
||||
}
|
||||
hc.addrToConns[key] = hcc
|
||||
hc.initialUpdatesWG.Add(1)
|
||||
hc.mu.Unlock()
|
||||
|
||||
hc.wg.Add(1)
|
||||
|
@ -576,6 +590,12 @@ func (hc *HealthCheckImpl) RemoveTablet(tablet *topodatapb.Tablet) {
|
|||
go hc.deleteConn(tablet)
|
||||
}
|
||||
|
||||
// WaitForInitialStatsUpdates waits until all tablets added via AddTablet() call
|
||||
// were propagated to downstream via corresponding StatsUpdate() calls.
|
||||
func (hc *HealthCheckImpl) WaitForInitialStatsUpdates() {
|
||||
hc.initialUpdatesWG.Wait()
|
||||
}
|
||||
|
||||
// GetConnection returns the TabletConn of the given tablet.
|
||||
func (hc *HealthCheckImpl) GetConnection(key string) tabletconn.TabletConn {
|
||||
hc.mu.RLock()
|
||||
|
|
|
@ -84,6 +84,10 @@ type TopologyWatcher struct {
|
|||
// mu protects all variables below
|
||||
mu sync.Mutex
|
||||
tablets map[string]*tabletInfo
|
||||
// firstLoadDone is true when first load of the topology data is done.
|
||||
firstLoadDone bool
|
||||
// firstLoadChan is closed when the initial loading of topology data is done.
|
||||
firstLoadChan chan struct{}
|
||||
}
|
||||
|
||||
// NewTopologyWatcher returns a TopologyWatcher that monitors all
|
||||
|
@ -98,6 +102,7 @@ func NewTopologyWatcher(topoServer topo.Server, tr TabletRecorder, cell string,
|
|||
sem: make(chan int, topoReadConcurrency),
|
||||
tablets: make(map[string]*tabletInfo),
|
||||
}
|
||||
tw.firstLoadChan = make(chan struct{})
|
||||
tw.ctx, tw.cancelFunc = context.WithCancel(context.Background())
|
||||
tw.wg.Add(1)
|
||||
go tw.watch()
|
||||
|
@ -172,9 +177,25 @@ func (tw *TopologyWatcher) loadTablets() {
|
|||
}
|
||||
}
|
||||
tw.tablets = newTablets
|
||||
if !tw.firstLoadDone {
|
||||
tw.firstLoadDone = true
|
||||
close(tw.firstLoadChan)
|
||||
}
|
||||
tw.mu.Unlock()
|
||||
}
|
||||
|
||||
// WaitForInitialTopology waits until the watcher reads all of the topology data
|
||||
// for the first time and transfers the information to TabletRecorder via its
|
||||
// AddTablet() method.
|
||||
func (tw *TopologyWatcher) WaitForInitialTopology() error {
|
||||
select {
|
||||
case <-tw.ctx.Done():
|
||||
return tw.ctx.Err()
|
||||
case <-tw.firstLoadChan:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the watcher. It does not clean up the tablets added to TabletRecorder.
|
||||
func (tw *TopologyWatcher) Stop() {
|
||||
tw.cancelFunc()
|
||||
|
|
|
@ -99,6 +99,12 @@ func (shardSwap *shardSchemaSwap) startHealthWatchers() error {
|
|||
discovery.DefaultTopoReadConcurrency)
|
||||
shardSwap.tabletWatchers = append(shardSwap.tabletWatchers, watcher)
|
||||
}
|
||||
for _, watcher := range shardSwap.tabletWatchers {
|
||||
if err := watcher.WaitForInitialTopology(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
shardSwap.tabletHealthCheck.WaitForInitialStatsUpdates()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче