Add Close func to HealthCheck.

This commit is contained in:
Liang Guo 2015-12-18 15:15:42 -08:00
Родитель 03f324c767
Коммит d6a0f27d3f
5 изменённых файлов: 59 добавлений и 8 удалений

Просмотреть файл

@ -17,7 +17,7 @@ type fakeHealthCheck struct {
}
// SetListener sets the listener for healthcheck updates.
func (fhc *fakeHealthCheck) SetListener(listener HealthCheckStatsListener) {
func (*fakeHealthCheck) SetListener(listener HealthCheckStatsListener) {
}
// AddEndPoint adds the endpoint, and starts health check.
@ -37,22 +37,27 @@ func (fhc *fakeHealthCheck) RemoveEndPoint(endPoint *topodatapb.EndPoint) {
}
// GetEndPointStatsFromKeyspaceShard returns all EndPointStats for the given keyspace/shard.
func (fhc *fakeHealthCheck) GetEndPointStatsFromKeyspaceShard(keyspace, shard string) []*EndPointStats {
func (*fakeHealthCheck) GetEndPointStatsFromKeyspaceShard(keyspace, shard string) []*EndPointStats {
return nil
}
// GetEndPointStatsFromTarget returns all EndPointStats for the given target.
func (fhc *fakeHealthCheck) GetEndPointStatsFromTarget(keyspace, shard string, tabletType topodatapb.TabletType) []*EndPointStats {
func (*fakeHealthCheck) GetEndPointStatsFromTarget(keyspace, shard string, tabletType topodatapb.TabletType) []*EndPointStats {
return nil
}
// GetConnection returns the TabletConn of the given endpoint.
func (fhc *fakeHealthCheck) GetConnection(endPoint *topodatapb.EndPoint) tabletconn.TabletConn {
func (*fakeHealthCheck) GetConnection(endPoint *topodatapb.EndPoint) tabletconn.TabletConn {
return nil
}
// CacheStatus returns a displayable version of the cache.
func (fhc *fakeHealthCheck) CacheStatus() EndPointsCacheStatusList {
func (*fakeHealthCheck) CacheStatus() EndPointsCacheStatusList {
return nil
}
// Close stops the healthcheck.
func (*fakeHealthCheck) Close() error {
return nil
}

Просмотреть файл

@ -60,6 +60,8 @@ type HealthCheck interface {
GetConnection(endPoint *topodatapb.EndPoint) tabletconn.TabletConn
// CacheStatus returns a displayable version of the cache.
CacheStatus() EndPointsCacheStatusList
// Close stops the healthcheck.
Close() error
}
// NewHealthCheck creates a new HealthCheck object.
@ -70,14 +72,31 @@ func NewHealthCheck(connTimeout time.Duration, retryDelay time.Duration, healthC
connTimeout: connTimeout,
retryDelay: retryDelay,
healthCheckTimeout: healthCheckTimeout,
closeChan: make(chan struct{}),
}
if hcConnCounters == nil {
hcConnCounters = stats.NewMultiCountersFunc("HealthcheckConnections", []string{"keyspace", "shardname", "tablettype"}, hc.servingConnStats)
}
go func() {
// Start another go routine to check timeout.
// Currently vttablet sends healthcheck response every 20 seconds.
// We set the default timeout to 1 minute (20s * 3),
// and also perform the timeout check in sync with vttablet frequency.
// When we change the healthcheck frequency on vttablet,
// we should also adjust here.
t := time.NewTicker(healthCheckTimeout / 3)
for range t.C {
hc.checkHealthCheckTimeout()
defer t.Stop()
for {
select {
case <-hc.closeChan:
return
case _, ok := <-t.C:
if !ok {
// the ticker stoped
return
}
hc.checkHealthCheckTimeout()
}
}
}()
return hc
@ -90,6 +109,7 @@ type HealthCheckImpl struct {
connTimeout time.Duration
retryDelay time.Duration
healthCheckTimeout time.Duration
closeChan chan struct{} // signals the process gorouting to terminate
// mu protects all the following fields
// when locking both mutex from HealthCheck and healthCheckConn, HealthCheck.mu goes first.
@ -126,10 +146,13 @@ func (hc *HealthCheckImpl) servingConnStats() map[string]int64 {
hc.mu.RLock()
defer hc.mu.RUnlock()
for _, hcc := range hc.addrToConns {
hcc.mu.RLock()
if !hcc.up || !hcc.serving || hcc.lastError != nil {
hcc.mu.RUnlock()
continue
}
key := fmt.Sprintf("%s.%s.%s", hcc.target.Keyspace, hcc.target.Shard, strings.ToLower(hcc.target.TabletType.String()))
hcc.mu.RUnlock()
res[key]++
}
return res
@ -690,6 +713,20 @@ func (hc *HealthCheckImpl) CacheStatus() EndPointsCacheStatusList {
return epcsl
}
// Close stops the healthcheck.
func (hc *HealthCheckImpl) Close() error {
hc.mu.Lock()
defer hc.mu.Unlock()
close(hc.closeChan)
hc.listener = nil
for _, hcc := range hc.addrToConns {
hcc.cancelFunc()
}
hc.addrToConns = make(map[string]*healthCheckConn)
hc.targetToEPs = make(map[string]map[string]map[topodatapb.TabletType][]*topodatapb.EndPoint)
return nil
}
// EndPointToMapKey creates a key to the map from endpoint's host and ports.
// It should only be used in discovery and related module.
func EndPointToMapKey(endPoint *topodatapb.EndPoint) string {

Просмотреть файл

@ -182,6 +182,8 @@ func TestHealthCheck(t *testing.T) {
if len(epsList) != 0 {
t.Errorf(`hc.GetEndPointStatsFromKeyspaceShard("k", "s") = %+v; want empty`, epsList)
}
// close healthcheck
hc.Close()
}
func TestHealthCheckTimeout(t *testing.T) {
@ -245,6 +247,8 @@ func TestHealthCheckTimeout(t *testing.T) {
if len(epsList) != 1 || !reflect.DeepEqual(epsList[0], want) {
t.Errorf(`hc.GetEndPointStatsFromKeyspaceShard("k", "s") = %+v; want %+v`, epsList, want)
}
// close healthcheck
hc.Close()
}
type listener struct {

Просмотреть файл

@ -39,7 +39,7 @@ var (
healthcheckTopologyRefresh = flag.Duration("binlog_player_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology when filtered replication is running")
retryDelay = flag.Duration("binlog_player_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck or a failed binlog connection")
healthCheckTimeout = flag.Duration("healthcheck_timeout", time.Minute, "the health check timeout period")
healthCheckTimeout = flag.Duration("binlog_player_healthcheck_timeout", time.Minute, "the health check timeout period")
)
func init() {

Просмотреть файл

@ -325,6 +325,11 @@ func (fhc *fakeHealthCheck) CacheStatus() discovery.EndPointsCacheStatusList {
return nil
}
// Close stops the healthcheck.
func (fhc *fakeHealthCheck) Close() error {
return nil
}
func (fhc *fakeHealthCheck) addTestEndPoint(cell, host string, port int32, keyspace, shard string, tabletType topodatapb.TabletType, serving bool, reparentTS int64, err error, conn tabletconn.TabletConn) *topodatapb.EndPoint {
ep := topo.NewEndPoint(0, host)
ep.PortMap["vt"] = port