зеркало из https://github.com/github/vitess-gh.git
Merge pull request #4832 from dweitzman/check_last_healthy_time_streaming
tabletmanager: only report healthy if a health check has succeeded re…
This commit is contained in:
Коммит
46b769c063
|
@ -22,10 +22,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/context"
|
||||
"vitess.io/vitess/go/vt/logutil"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
"vitess.io/vitess/go/vt/topo/memorytopo"
|
||||
"vitess.io/vitess/go/vt/vttablet/grpcqueryservice"
|
||||
"vitess.io/vitess/go/vt/vttablet/queryservice"
|
||||
|
@ -33,9 +34,6 @@ import (
|
|||
"vitess.io/vitess/go/vt/vttablet/tmclient"
|
||||
"vitess.io/vitess/go/vt/wrangler"
|
||||
"vitess.io/vitess/go/vt/wrangler/testlib"
|
||||
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
// streamHealthTabletServer is a local QueryService implementation to support the tests
|
||||
|
@ -91,7 +89,7 @@ func (s *streamHealthTabletServer) streamHealthUnregister(id int) error {
|
|||
}
|
||||
|
||||
// BroadcastHealth will broadcast the current health to all listeners
|
||||
func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) {
|
||||
func (s *streamHealthTabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) {
|
||||
shr := &querypb.StreamHealthResponse{
|
||||
TabletExternallyReparentedTimestamp: terTimestamp,
|
||||
RealtimeStats: stats,
|
||||
|
@ -137,7 +135,7 @@ func TestTabletData(t *testing.T) {
|
|||
case <-stop:
|
||||
return
|
||||
default:
|
||||
shsq.BroadcastHealth(42, stats)
|
||||
shsq.BroadcastHealth(42, stats, time.Minute)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -29,17 +29,15 @@ import (
|
|||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/mysql"
|
||||
"vitess.io/vitess/go/sqltypes"
|
||||
"vitess.io/vitess/go/vt/callerid"
|
||||
"vitess.io/vitess/go/vt/log"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
|
||||
"vitess.io/vitess/go/vt/sqlparser"
|
||||
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
|
||||
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
|
||||
)
|
||||
|
||||
func TestSimpleRead(t *testing.T) {
|
||||
|
@ -453,7 +451,7 @@ func TestHealth(t *testing.T) {
|
|||
|
||||
func TestStreamHealth(t *testing.T) {
|
||||
var health *querypb.StreamHealthResponse
|
||||
framework.Server.BroadcastHealth(0, nil)
|
||||
framework.Server.BroadcastHealth(0, nil, time.Minute)
|
||||
if err := framework.Server.StreamHealth(context.Background(), func(shr *querypb.StreamHealthResponse) error {
|
||||
health = shr
|
||||
return io.EOF
|
||||
|
@ -465,6 +463,23 @@ func TestStreamHealth(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStreamHealth_Expired(t *testing.T) {
|
||||
var health *querypb.StreamHealthResponse
|
||||
framework.Server.BroadcastHealth(0, nil, time.Millisecond)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond * 100)
|
||||
defer cancel()
|
||||
if err := framework.Server.StreamHealth(ctx, func(shr *querypb.StreamHealthResponse) error {
|
||||
health = shr
|
||||
return io.EOF
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if health != nil {
|
||||
t.Errorf("Health: %v, want %v", health, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func TestQueryStats(t *testing.T) {
|
||||
client := framework.NewClient()
|
||||
vstart := framework.DebugVars()
|
||||
|
|
|
@ -24,24 +24,21 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"vitess.io/vitess/go/vt/vterrors"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/event"
|
||||
"vitess.io/vitess/go/trace"
|
||||
"vitess.io/vitess/go/vt/key"
|
||||
"vitess.io/vitess/go/vt/log"
|
||||
"vitess.io/vitess/go/vt/mysqlctl"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
"vitess.io/vitess/go/vt/topo"
|
||||
"vitess.io/vitess/go/vt/topo/topoproto"
|
||||
"vitess.io/vitess/go/vt/vterrors"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletmanager/events"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
|
||||
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -105,6 +102,7 @@ func (agent *ActionAgent) broadcastHealth() {
|
|||
replicationDelay := agent._replicationDelay
|
||||
healthError := agent._healthy
|
||||
terTime := agent._tabletExternallyReparentedTime
|
||||
healthyTime := agent._healthyTime
|
||||
agent.mutex.Unlock()
|
||||
|
||||
// send it to our observers
|
||||
|
@ -116,12 +114,17 @@ func (agent *ActionAgent) broadcastHealth() {
|
|||
stats.Qps = tabletenv.QPSRates.TotalRate()
|
||||
if healthError != nil {
|
||||
stats.HealthError = healthError.Error()
|
||||
} else {
|
||||
timeSinceLastCheck := time.Since(healthyTime)
|
||||
if timeSinceLastCheck > *healthCheckInterval*3 {
|
||||
stats.HealthError = fmt.Sprintf("last health check is too old: %s > %s", timeSinceLastCheck, *healthCheckInterval*3)
|
||||
}
|
||||
}
|
||||
var ts int64
|
||||
if !terTime.IsZero() {
|
||||
ts = terTime.Unix()
|
||||
}
|
||||
go agent.QueryServiceControl.BroadcastHealth(ts, stats)
|
||||
go agent.QueryServiceControl.BroadcastHealth(ts, stats, *healthCheckInterval*3)
|
||||
}
|
||||
|
||||
// refreshTablet needs to be run after an action may have changed the current
|
||||
|
|
|
@ -76,7 +76,7 @@ type Controller interface {
|
|||
SchemaEngine() *schema.Engine
|
||||
|
||||
// BroadcastHealth sends the current health to all listeners
|
||||
BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats)
|
||||
BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration)
|
||||
|
||||
// HeartbeatLag returns the current lag as calculated by the heartbeat
|
||||
// package, if heartbeat is enabled. Otherwise returns 0.
|
||||
|
|
|
@ -30,7 +30,6 @@ import (
|
|||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"vitess.io/vitess/go/acl"
|
||||
"vitess.io/vitess/go/history"
|
||||
"vitess.io/vitess/go/mysql"
|
||||
|
@ -44,6 +43,10 @@ import (
|
|||
"vitess.io/vitess/go/vt/dbconnpool"
|
||||
"vitess.io/vitess/go/vt/log"
|
||||
"vitess.io/vitess/go/vt/logutil"
|
||||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
|
||||
"vitess.io/vitess/go/vt/sqlparser"
|
||||
"vitess.io/vitess/go/vt/srvtopo"
|
||||
"vitess.io/vitess/go/vt/tableacl"
|
||||
|
@ -61,11 +64,6 @@ import (
|
|||
"vitess.io/vitess/go/vt/vttablet/tabletserver/txserializer"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"
|
||||
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
|
||||
|
||||
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
|
||||
querypb "vitess.io/vitess/go/vt/proto/query"
|
||||
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
|
||||
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -184,10 +182,11 @@ type TabletServer struct {
|
|||
topoServer *topo.Server
|
||||
|
||||
// streamHealthMutex protects all the following fields
|
||||
streamHealthMutex sync.Mutex
|
||||
streamHealthIndex int
|
||||
streamHealthMap map[int]chan<- *querypb.StreamHealthResponse
|
||||
lastStreamHealthResponse *querypb.StreamHealthResponse
|
||||
streamHealthMutex sync.Mutex
|
||||
streamHealthIndex int
|
||||
streamHealthMap map[int]chan<- *querypb.StreamHealthResponse
|
||||
lastStreamHealthResponse *querypb.StreamHealthResponse
|
||||
lastStreamHealthExpiration time.Time
|
||||
|
||||
// history records changes in state for display on the status page.
|
||||
// It has its own internal mutex.
|
||||
|
@ -1823,9 +1822,10 @@ func createSplitQueryAlgorithmObject(
|
|||
func (tsv *TabletServer) StreamHealth(ctx context.Context, callback func(*querypb.StreamHealthResponse) error) error {
|
||||
tsv.streamHealthMutex.Lock()
|
||||
shr := tsv.lastStreamHealthResponse
|
||||
shrExpiration := tsv.lastStreamHealthExpiration
|
||||
tsv.streamHealthMutex.Unlock()
|
||||
// Send current state immediately.
|
||||
if shr != nil {
|
||||
if shr != nil && time.Now().Before(shrExpiration) {
|
||||
if err := callback(shr); err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
|
@ -1871,14 +1871,14 @@ func (tsv *TabletServer) streamHealthUnregister(id int) {
|
|||
}
|
||||
|
||||
// BroadcastHealth will broadcast the current health to all listeners
|
||||
func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) {
|
||||
func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) {
|
||||
tsv.mu.Lock()
|
||||
target := tsv.target
|
||||
tsv.mu.Unlock()
|
||||
shr := &querypb.StreamHealthResponse{
|
||||
Target: &target,
|
||||
TabletAlias: &tsv.alias,
|
||||
Serving: tsv.IsServing(),
|
||||
Target: &target,
|
||||
TabletAlias: &tsv.alias,
|
||||
Serving: tsv.IsServing(),
|
||||
TabletExternallyReparentedTimestamp: terTimestamp,
|
||||
RealtimeStats: stats,
|
||||
}
|
||||
|
@ -1893,6 +1893,7 @@ func (tsv *TabletServer) BroadcastHealth(terTimestamp int64, stats *querypb.Real
|
|||
}
|
||||
}
|
||||
tsv.lastStreamHealthResponse = shr
|
||||
tsv.lastStreamHealthExpiration = time.Now().Add(maxCache)
|
||||
}
|
||||
|
||||
// HeartbeatLag returns the current lag as calculated by the heartbeat
|
||||
|
|
|
@ -187,7 +187,7 @@ func (tqsc *Controller) SchemaEngine() *schema.Engine {
|
|||
}
|
||||
|
||||
// BroadcastHealth is part of the tabletserver.Controller interface
|
||||
func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats) {
|
||||
func (tqsc *Controller) BroadcastHealth(terTimestamp int64, stats *querypb.RealtimeStats, maxCache time.Duration) {
|
||||
tqsc.mu.Lock()
|
||||
defer tqsc.mu.Unlock()
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче