зеркало из https://github.com/github/vitess-gh.git
Merge pull request #7654 from planetscale/ds-fix-7472-7177
healthcheck: update healthy tablets correctly when a stream returns an error or times out
This commit is contained in:
Коммит
58fc9a158a
|
@ -404,20 +404,20 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
|
|||
}
|
||||
}
|
||||
|
||||
func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealthResponse, currentTarget *query.Target, trivialNonMasterUpdate bool, isMasterUpdate bool, isMasterChange bool) {
|
||||
func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Target, trivialUpdate bool, isPrimaryUp bool) {
|
||||
// hc.healthByAlias is authoritative, it should be updated
|
||||
hc.mu.Lock()
|
||||
defer hc.mu.Unlock()
|
||||
|
||||
tabletAlias := tabletAliasString(topoproto.TabletAliasString(shr.TabletAlias))
|
||||
|
||||
hcErrorCounters.Add([]string{shr.Target.Keyspace, shr.Target.Shard, topoproto.TabletTypeLString(shr.Target.TabletType)}, 0)
|
||||
targetKey := hc.keyFromTarget(shr.Target)
|
||||
targetChanged := currentTarget.TabletType != shr.Target.TabletType || currentTarget.Keyspace != shr.Target.Keyspace || currentTarget.Shard != shr.Target.Shard
|
||||
tabletAlias := tabletAliasString(topoproto.TabletAliasString(th.Tablet.Alias))
|
||||
targetKey := hc.keyFromTarget(th.Target)
|
||||
targetChanged := prevTarget.TabletType != th.Target.TabletType || prevTarget.Keyspace != th.Target.Keyspace || prevTarget.Shard != th.Target.Shard
|
||||
if targetChanged {
|
||||
// Error counter has to be set here in case we get a new tablet type for the first time in a stream response
|
||||
hcErrorCounters.Add([]string{th.Target.Keyspace, th.Target.Shard, topoproto.TabletTypeLString(th.Target.TabletType)}, 0)
|
||||
// keyspace and shard are not expected to change, but just in case ...
|
||||
// move this tabletHealthCheck to the correct map
|
||||
oldTargetKey := hc.keyFromTarget(currentTarget)
|
||||
oldTargetKey := hc.keyFromTarget(prevTarget)
|
||||
delete(hc.healthData[oldTargetKey], tabletAlias)
|
||||
_, ok := hc.healthData[targetKey]
|
||||
if !ok {
|
||||
|
@ -427,44 +427,52 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, shr *query.StreamHealt
|
|||
// add it to the map by target
|
||||
hc.healthData[targetKey][tabletAlias] = th
|
||||
|
||||
if isMasterUpdate {
|
||||
isPrimary := th.Target.TabletType == topodata.TabletType_MASTER
|
||||
switch {
|
||||
case isPrimary && isPrimaryUp:
|
||||
if len(hc.healthy[targetKey]) == 0 {
|
||||
hc.healthy[targetKey] = append(hc.healthy[targetKey], th)
|
||||
} else {
|
||||
// We already have one up server, see if we
|
||||
// need to replace it.
|
||||
if shr.TabletExternallyReparentedTimestamp < hc.healthy[targetKey][0].MasterTermStartTime {
|
||||
if th.MasterTermStartTime < hc.healthy[targetKey][0].MasterTermStartTime {
|
||||
log.Warningf("not marking healthy master %s as Up for %s because its MasterTermStartTime is smaller than the highest known timestamp from previous MASTERs %s: %d < %d ",
|
||||
topoproto.TabletAliasString(shr.TabletAlias),
|
||||
topoproto.KeyspaceShardString(shr.Target.Keyspace, shr.Target.Shard),
|
||||
topoproto.TabletAliasString(th.Tablet.Alias),
|
||||
topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard),
|
||||
topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias),
|
||||
shr.TabletExternallyReparentedTimestamp,
|
||||
th.MasterTermStartTime,
|
||||
hc.healthy[targetKey][0].MasterTermStartTime)
|
||||
} else {
|
||||
// Just replace it.
|
||||
hc.healthy[targetKey][0] = th
|
||||
}
|
||||
}
|
||||
case isPrimary && !isPrimaryUp:
|
||||
// No healthy master tablet
|
||||
hc.healthy[targetKey] = []*TabletHealth{}
|
||||
}
|
||||
if !trivialNonMasterUpdate {
|
||||
|
||||
if !trivialUpdate {
|
||||
// We re-sort the healthy tablet list whenever we get a health update for tablets we can route to.
|
||||
// Tablets from other cells for non-master targets should not trigger a re-sort;
|
||||
// they should also be excluded from healthy list.
|
||||
if shr.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(shr.Target.TabletType, shr.TabletAlias) {
|
||||
if th.Target.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) {
|
||||
hc.recomputeHealthy(targetKey)
|
||||
}
|
||||
if targetChanged && currentTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(shr.Target.TabletType, shr.TabletAlias) { // also recompute old target's healthy list
|
||||
oldTargetKey := hc.keyFromTarget(currentTarget)
|
||||
if targetChanged && prevTarget.TabletType != topodata.TabletType_MASTER && hc.isIncluded(th.Target.TabletType, th.Tablet.Alias) { // also recompute old target's healthy list
|
||||
oldTargetKey := hc.keyFromTarget(prevTarget)
|
||||
hc.recomputeHealthy(oldTargetKey)
|
||||
}
|
||||
}
|
||||
if isMasterChange {
|
||||
log.Errorf("Adding 1 to MasterPromoted counter for tablet: %v, shr.Tablet: %v, shr.TabletType: %v", currentTarget, topoproto.TabletAliasString(shr.TabletAlias), shr.Target.TabletType)
|
||||
hcMasterPromotedCounters.Add([]string{shr.Target.Keyspace, shr.Target.Shard}, 1)
|
||||
|
||||
isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_MASTER
|
||||
if isNewPrimary {
|
||||
log.Errorf("Adding 1 to MasterPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType)
|
||||
hcMasterPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1)
|
||||
}
|
||||
|
||||
// broadcast to subscribers
|
||||
hc.broadcast(th)
|
||||
|
||||
}
|
||||
|
||||
func (hc *HealthCheckImpl) recomputeHealthy(key keyspaceShardTabletType) {
|
||||
|
|
|
@ -65,6 +65,8 @@ func init() {
|
|||
}
|
||||
|
||||
func TestHealthCheck(t *testing.T) {
|
||||
// reset error counters
|
||||
hcErrorCounters.ResetAll()
|
||||
ts := memorytopo.NewServer("cell")
|
||||
hc := createTestHc(ts)
|
||||
// close healthcheck
|
||||
|
@ -257,6 +259,69 @@ func TestHealthCheckStreamError(t *testing.T) {
|
|||
result = <-resultChan
|
||||
//TODO: figure out how to compare objects that contain errors using utils.MustMatch
|
||||
assert.True(t, want.DeepEqual(result), "Wrong TabletHealth data\n Expected: %v\n Actual: %v", want, result)
|
||||
// tablet should be removed from healthy list
|
||||
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
|
||||
assert.Empty(t, a, "wrong result, expected empty list")
|
||||
}
|
||||
|
||||
// TestHealthCheckErrorOnPrimary is the same as TestHealthCheckStreamError except for tablet type
|
||||
func TestHealthCheckErrorOnPrimary(t *testing.T) {
|
||||
ts := memorytopo.NewServer("cell")
|
||||
hc := createTestHc(ts)
|
||||
defer hc.Close()
|
||||
|
||||
tablet := createTestTablet(0, "cell", "a")
|
||||
input := make(chan *querypb.StreamHealthResponse)
|
||||
resultChan := hc.Subscribe()
|
||||
fc := createFakeConn(tablet, input)
|
||||
fc.errCh = make(chan error)
|
||||
hc.AddTablet(tablet)
|
||||
|
||||
// Immediately after AddTablet() there will be the first notification.
|
||||
want := &TabletHealth{
|
||||
Tablet: tablet,
|
||||
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
|
||||
Serving: false,
|
||||
MasterTermStartTime: 0,
|
||||
}
|
||||
result := <-resultChan
|
||||
mustMatch(t, want, result, "Wrong TabletHealth data")
|
||||
|
||||
// one tablet after receiving a StreamHealthResponse
|
||||
shr := &querypb.StreamHealthResponse{
|
||||
TabletAlias: tablet.Alias,
|
||||
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
|
||||
Serving: true,
|
||||
TabletExternallyReparentedTimestamp: 10,
|
||||
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
|
||||
}
|
||||
want = &TabletHealth{
|
||||
Tablet: tablet,
|
||||
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
|
||||
Serving: true,
|
||||
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
|
||||
MasterTermStartTime: 10,
|
||||
}
|
||||
input <- shr
|
||||
result = <-resultChan
|
||||
mustMatch(t, want, result, "Wrong TabletHealth data")
|
||||
|
||||
// Stream error
|
||||
fc.errCh <- fmt.Errorf("some stream error")
|
||||
want = &TabletHealth{
|
||||
Tablet: tablet,
|
||||
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER},
|
||||
Serving: false,
|
||||
Stats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
|
||||
MasterTermStartTime: 10,
|
||||
LastError: fmt.Errorf("some stream error"),
|
||||
}
|
||||
result = <-resultChan
|
||||
//TODO: figure out how to compare objects that contain errors using utils.MustMatch
|
||||
assert.True(t, want.DeepEqual(result), "Wrong TabletHealth data\n Expected: %v\n Actual: %v", want, result)
|
||||
// tablet should be removed from healthy list
|
||||
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER})
|
||||
assert.Empty(t, a, "wrong result, expected empty list")
|
||||
}
|
||||
|
||||
func TestHealthCheckVerifiesTabletAlias(t *testing.T) {
|
||||
|
@ -363,6 +428,8 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHealthCheckTimeout(t *testing.T) {
|
||||
// reset counters
|
||||
hcErrorCounters.ResetAll()
|
||||
ts := memorytopo.NewServer("cell")
|
||||
hc := createTestHc(ts)
|
||||
hc.healthCheckTimeout = 500 * time.Millisecond
|
||||
|
@ -410,6 +477,10 @@ func TestHealthCheckTimeout(t *testing.T) {
|
|||
assert.Nil(t, checkErrorCounter("k", "s", topodatapb.TabletType_REPLICA, 1))
|
||||
assert.True(t, fc.isCanceled(), "StreamHealth should be canceled after timeout, but is not")
|
||||
|
||||
// tablet should be removed from healthy list
|
||||
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA})
|
||||
assert.Empty(t, a, "wrong result, expected empty list")
|
||||
|
||||
// repeat the wait. It will timeout one more time trying to get the connection.
|
||||
fc.resetCanceledFlag()
|
||||
time.Sleep(hc.healthCheckTimeout)
|
||||
|
@ -798,7 +869,7 @@ func TestMasterInOtherCell(t *testing.T) {
|
|||
|
||||
// check that MASTER tablet from other cell IS in healthy tablet list
|
||||
a := hc.GetHealthyTabletStats(&querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_MASTER})
|
||||
assert.Len(t, a, 1, "")
|
||||
require.Len(t, a, 1, "")
|
||||
mustMatch(t, want, a[0], "Expecting healthy master")
|
||||
}
|
||||
|
||||
|
|
|
@ -184,12 +184,10 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
|
|||
return vterrors.New(vtrpc.Code_FAILED_PRECONDITION, fmt.Sprintf("health stats mismatch, tablet %+v alias does not match response alias %v", thc.Tablet, shr.TabletAlias))
|
||||
}
|
||||
|
||||
currentTarget := thc.Target
|
||||
prevTarget := thc.Target
|
||||
// check whether this is a trivial update so as to update healthy map
|
||||
trivialNonMasterUpdate := thc.LastError == nil && thc.Serving && shr.RealtimeStats.HealthError == "" && shr.Serving &&
|
||||
currentTarget.TabletType != topodata.TabletType_MASTER && currentTarget.TabletType == shr.Target.TabletType && thc.isTrivialReplagChange(shr.RealtimeStats)
|
||||
isMasterUpdate := shr.Target.TabletType == topodata.TabletType_MASTER
|
||||
isMasterChange := thc.Target.TabletType != topodata.TabletType_MASTER && shr.Target.TabletType == topodata.TabletType_MASTER
|
||||
trivialUpdate := thc.LastError == nil && thc.Serving && shr.RealtimeStats.HealthError == "" && shr.Serving &&
|
||||
prevTarget.TabletType != topodata.TabletType_MASTER && prevTarget.TabletType == shr.Target.TabletType && thc.isTrivialReplagChange(shr.RealtimeStats)
|
||||
thc.lastResponseTimestamp = time.Now()
|
||||
thc.Target = shr.Target
|
||||
thc.MasterTermStartTime = shr.TabletExternallyReparentedTimestamp
|
||||
|
@ -202,7 +200,7 @@ func (thc *tabletHealthCheck) processResponse(hc *HealthCheckImpl, shr *query.St
|
|||
thc.setServingState(serving, reason)
|
||||
|
||||
// notify downstream for master change
|
||||
hc.updateHealth(thc.SimpleCopy(), shr, currentTarget, trivialNonMasterUpdate, isMasterUpdate, isMasterChange)
|
||||
hc.updateHealth(thc.SimpleCopy(), prevTarget, trivialUpdate, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -241,6 +239,9 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
|
|||
hc.connsWG.Done()
|
||||
}()
|
||||
|
||||
// Initialize error counter
|
||||
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 0)
|
||||
|
||||
retryDelay := hc.retryDelay
|
||||
for {
|
||||
streamCtx, streamCancel := context.WithCancel(thc.ctx)
|
||||
|
@ -287,12 +288,14 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
|
|||
streamCancel()
|
||||
|
||||
if err != nil {
|
||||
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1)
|
||||
if strings.Contains(err.Error(), "health stats mismatch") {
|
||||
hc.deleteTablet(thc.Tablet)
|
||||
return
|
||||
}
|
||||
res := thc.SimpleCopy()
|
||||
hc.broadcast(res)
|
||||
// trivialUpdate = false because this is an error
|
||||
// isPrimaryUp = false because we did not get a healthy response
|
||||
hc.updateHealth(thc.SimpleCopy(), thc.Target, false, false)
|
||||
}
|
||||
// If there was a timeout send an error. We do this after stream has returned.
|
||||
// This will ensure that this update prevails over any previous message that
|
||||
|
@ -301,7 +304,9 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
|
|||
thc.LastError = fmt.Errorf("healthcheck timed out (latest %v)", thc.lastResponseTimestamp)
|
||||
thc.setServingState(false, thc.LastError.Error())
|
||||
hcErrorCounters.Add([]string{thc.Target.Keyspace, thc.Target.Shard, topoproto.TabletTypeLString(thc.Target.TabletType)}, 1)
|
||||
hc.broadcast(thc.SimpleCopy())
|
||||
// trivialUpdate = false because this is an error
|
||||
// isPrimaryUp = false because we did not get a healthy response within the timeout
|
||||
hc.updateHealth(thc.SimpleCopy(), thc.Target, false, false)
|
||||
}
|
||||
|
||||
// Streaming RPC failed e.g. because vttablet was restarted or took too long.
|
||||
|
|
Загрузка…
Ссылка в новой задаче