diff --git a/go.mod b/go.mod index ad6b724..8b62ae5 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,9 @@ go 1.18 require ( github.com/coreos/etcd v3.3.27+incompatible github.com/google/uuid v1.1.2 + github.com/prometheus/client_golang v1.12.2 github.com/stretchr/testify v1.7.2 + go.etcd.io/etcd/pkg/v3 v3.5.4 go.uber.org/zap v1.21.0 golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f google.golang.org/grpc v1.47.0 @@ -14,32 +16,24 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/coreos/bbolt v1.3.2 // indirect github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect - github.com/dustin/go-humanize v1.0.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/btree v1.1.2 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect - github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/jonboulle/clockwork v0.3.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect - github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect - go.etcd.io/etcd/pkg/v3 v3.5.4 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 // indirect @@ -48,8 +42,6 @@ require ( golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index f19e09a..b9aa67d 100644 --- a/go.sum +++ b/go.sum @@ -52,7 +52,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= diff --git a/internal/testutil/testutil.go b/internal/testutil/testutil.go index 08f2c8f..fc36f3d 100644 --- a/internal/testutil/testutil.go +++ b/internal/testutil/testutil.go @@ -66,13 +66,13 @@ func EventKeys(events []*mvccpb.Event) []string { } type TimestampedEvent interface { - GetModRev() int64 + GetRevision() int64 } func EventModRevs[T TimestampedEvent](events []T) []int64 { ret := make([]int64, len(events)) for i, event := range events { - ret[i] = event.GetModRev() + ret[i] = event.GetRevision() } return ret } diff --git a/internal/util/list.go b/internal/util/list.go new file mode 100644 index 0000000..0a6bab6 --- /dev/null +++ b/internal/util/list.go @@ -0,0 +1,96 @@ +package util + +// List was adopted from https://gist.github.com/pje/90e727f80685c78a6c1cfff35f62155a. +// Replaec with container/list once it's generic. +type List[T any] struct { + root Element[T] + Len int +} + +func (l *List[T]) First() *Element[T] { + if l.Len == 0 { + return nil + } + return l.root.next +} + +func (l *List[T]) Last() *Element[T] { + if l.Len == 0 { + return nil + } + return l.root.prev +} + +func (l *List[T]) PushFront(v T) *Element[T] { + if l.root.next == nil { + l.init() + } + return l.InsertAfter(v, &l.root) +} + +func (l *List[T]) PushBack(v T) *Element[T] { + if l.root.next == nil { + l.init() + } + return l.InsertAfter(v, l.root.prev) +} + +func (l *List[T]) Remove(e *Element[T]) T { + if e.list == l { + l.remove(e) + } + return e.Value +} + +func (l *List[T]) init() { + l.root = *new(Element[T]) + l.root.next = &l.root + l.root.prev = &l.root +} + +func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] { + e.prev = at + e.next = at.next + e.prev.next = e + e.next.prev = e + e.list = l + l.Len++ + return e +} + +func (l *List[T]) InsertAfter(v T, at *Element[T]) *Element[T] { + e := Element[T]{Value: v} + return l.insertAfter(&e, at) +} + +func (l *List[T]) remove(e *Element[T]) { + e.prev.next = e.next + e.next.prev = e.prev + e.next = nil + e.prev = nil + e.list = nil + l.Len-- +} + +type Element[T any] struct { + prev *Element[T] + next *Element[T] + list *List[T] + Value T +} + +func (e *Element[T]) Next() *Element[T] { + n := e.next + if e.list == nil || n == &e.list.root { + return nil + } + return n +} + +func (e *Element[T]) Prev() *Element[T] { + p := e.prev + if e.list == nil || p == &e.list.root { + return nil + } + return p +} diff --git a/internal/util/metrics.go b/internal/util/metrics.go index 34f086b..0ebf20a 100644 --- a/internal/util/metrics.go +++ b/internal/util/metrics.go @@ -3,41 +3,27 @@ package util import "github.com/prometheus/client_golang/prometheus" var ( - currentWatchRev = prometheus.NewGauge( + timeBufferVisibleMax = prometheus.NewGauge( prometheus.GaugeOpts{ - Name: "metaetcd_current_watch_rev", - Help: "The last meta cluster revision observed.", + Name: "metaetcd_time_buffer_visible_max", + Help: "Max revision visible from the time buffer", }) - watchBufferLength = prometheus.NewGauge( + timeBufferLength = prometheus.NewGauge( prometheus.GaugeOpts{ - Name: "metaetcd_watch_buffer_len", - Help: "The length of the watch buffer.", + Name: "metaetcd_time_buffer_len", + Help: "The length of the time buffer.", }) - watchEventCount = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "metaetcd_watch_event_count", - Help: "The total watch events that have been pushed into the buffer.", - }) - - watchGapTimeoutCount = prometheus.NewCounter( + timeBufferTimeoutCount = prometheus.NewCounter( prometheus.CounterOpts{ - Name: "metaetcd_watch_gap_timeout_count", - Help: "The number of watch event gaps that were never filled.", - }) - - watchLatency = prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: "metaetcd_watch_latency_seconds", - Help: "The time between a watch event being received and exposed.", + Name: "metaetcd_time_buffer_timeouts_count", + Help: "The number of buffer event gaps that were never filled.", }) ) func init() { - prometheus.MustRegister(currentWatchRev) - prometheus.MustRegister(watchBufferLength) - prometheus.MustRegister(watchEventCount) - prometheus.MustRegister(watchGapTimeoutCount) - prometheus.MustRegister(watchLatency) + prometheus.MustRegister(timeBufferVisibleMax) + prometheus.MustRegister(timeBufferLength) + prometheus.MustRegister(timeBufferTimeoutCount) } diff --git a/internal/util/timebuffer.go b/internal/util/timebuffer.go index 11ada88..0d24680 100644 --- a/internal/util/timebuffer.go +++ b/internal/util/timebuffer.go @@ -1,7 +1,6 @@ package util import ( - "container/list" "context" "sync" "time" @@ -11,32 +10,33 @@ import ( type BufferableEvent[T any] interface { GetAge() time.Duration - GetModRev() int64 - InRange(T) bool + GetRevision() int64 + Matches(query T) bool } +// TimeBuffer buffers events and sorts them by their logical timestamp. +// +// Events are only visible to readers when the preceding event has been received. +// If the preceding event is never received, the following event will still become +// visible after a (wallclock) timeout period in order to prevent deadlocks caused +// dropped events. +// +// In practice, it is useful for merging streams of events that may be out of order due +// to network latency, and have the possibility of missing events due to network partitions. type TimeBuffer[T any, TT BufferableEvent[T]] struct { - mut sync.Mutex - list *list.List - gapTimeout time.Duration - maxLen int - lowerBound, upperBound int64 - upperVal *list.Element - ch chan<- TT + mut sync.Mutex + list *List[TT] + gapTimeout time.Duration + len int + min, max int64 + cursor *Element[TT] + ch chan<- TT } -func NewTimeBuffer[T any, TT BufferableEvent[T]](gapTimeout time.Duration, maxLen int, ch chan<- TT) *TimeBuffer[T, TT] { - return &TimeBuffer[T, TT]{list: list.New(), gapTimeout: gapTimeout, maxLen: maxLen, ch: ch, lowerBound: -1} +func NewTimeBuffer[T any, TT BufferableEvent[T]](gapTimeout time.Duration, len int, ch chan<- TT) *TimeBuffer[T, TT] { + return &TimeBuffer[T, TT]{list: &List[TT]{}, gapTimeout: gapTimeout, len: len, min: -1, ch: ch} } -func (t *TimeBuffer[T, TT]) LatestVisibleRev() int64 { - t.mut.Lock() - defer t.mut.Unlock() - return t.upperBound -} - -func (t *TimeBuffer[T, TT]) MaxLen() int { return t.maxLen } - func (t *TimeBuffer[T, TT]) Run(ctx context.Context) { ticker := time.NewTicker(t.gapTimeout) defer ticker.Stop() @@ -50,144 +50,154 @@ func (t *TimeBuffer[T, TT]) Run(ctx context.Context) { } } -func (t *TimeBuffer[T, TT]) Push(event TT) { - watchEventCount.Inc() +func (t *TimeBuffer[T, TT]) LatestVisibleRev() int64 { t.mut.Lock() defer t.mut.Unlock() + return t.max +} +func (t *TimeBuffer[T, TT]) Len() int { return t.len } + +func (t *TimeBuffer[T, TT]) Push(event TT) { + t.mut.Lock() + defer t.mut.Unlock() t.pushUnlocked(event) t.bridgeGapUnlocked() } func (t *TimeBuffer[T, TT]) pushUnlocked(event TT) { - watchBufferLength.Inc() - lastEl := t.list.Back() + timeBufferLength.Inc() - // Case 1: first element - if lastEl == nil { - t.list.PushFront(event) - return - } - - // Case 2: outside of range - insert before or after - last := lastEl.Value.(TT) - if event.GetModRev() > last.GetModRev() { - t.list.PushBack(event) - return - } - - firstEl := t.list.Front() - first := firstEl.Value.(TT) - if event.GetModRev() < first.GetModRev() { - t.list.PushFront(event) - return - } - - // Case 3: find place between pairs of events + cursorItem := t.list.Last() // start at the newest event for { - firstEl = lastEl.Prev() - if firstEl == nil { - break + if cursorItem == nil { + t.list.PushFront(event) + return // prepend to back of list } - first = firstEl.Value.(TT) - if event.GetModRev() > first.GetModRev() { - t.list.InsertAfter(event, firstEl) - return + cursorEvent := cursorItem.Value + if event.GetRevision() < cursorEvent.GetRevision() { + cursorItem = cursorItem.Prev() + continue // keep scanning back in time } - lastEl = firstEl + + t.list.InsertAfter(event, cursorItem) + return } } func (t *TimeBuffer[T, TT]) trimUnlocked() { - item := t.list.Front() + item := t.list.First() for { - if t.list.Len() <= t.maxLen || item == nil { - return + if t.list.Len <= t.len || item == nil { + break } - event := item.Value.(TT) - + event := item.Value next := item.Next() - if t.upperBound > event.GetModRev() { - watchBufferLength.Dec() + + // Trim if the buffer is too long + if t.max > event.GetRevision() { + timeBufferLength.Dec() t.list.Remove(item) } + + // Keep track of the buffer's tail if next != nil { - newFront := next.Value.(TT) - t.lowerBound = newFront.GetModRev() + t.min = next.Value.GetRevision() } item = next } } -func (t *TimeBuffer[T, TT]) Range(start int64, ivl T) ([]TT, int64, int64) { +func (t *TimeBuffer[T, TT]) All() (slice []TT) { + t.mut.Lock() + defer t.mut.Unlock() + + item := t.list.First() // start at the oldest event + for { + if item == nil { + break + } + slice = append(slice, item.Value) + item = item.Next() + } + return +} + +func (t *TimeBuffer[T, TT]) Range(start int64, query T) ([]TT, int64, int64) { t.mut.Lock() defer t.mut.Unlock() slice := []TT{} - pos := t.list.Front() + item := t.list.First() // start at the oldest event for { - if pos == nil { + if item == nil { break } - e := pos.Value.(TT) - if e.GetModRev() <= start { - pos = pos.Next() + event := item.Value + if event.GetRevision() <= start { + item = item.Next() continue } - if e.GetModRev() > t.upperBound { + if event.GetRevision() > t.max { break } - if e.InRange(ivl) { - slice = append(slice, e) + if event.Matches(query) { + slice = append(slice, event) } - pos = pos.Next() + item = item.Next() } - return slice, t.lowerBound, t.upperBound + return slice, t.min, t.max } func (t *TimeBuffer[T, TT]) bridgeGapUnlocked() { - val := t.upperVal - if val == nil { - val = t.list.Front() + item := t.cursor // start at the oldest visible event and scan forwards + if item == nil { + item = t.list.First() } for { - if val == nil || val.Value == nil { - break + if item == nil { + break // at end of list } - valE := val.Value.(TT) + event := item.Value - if valE.GetModRev() <= t.upperBound { - val = val.Next() - continue // this gap has already been closed + // Not a gap - keep scanning + if event.GetRevision() <= t.max { + item = item.Next() + continue } - isNextEvent := valE.GetModRev() == t.upperBound+1 - age := valE.GetAge() + isNextEvent := event.GetRevision() == t.max+1 + age := event.GetAge() hasTimedout := age > t.gapTimeout - if hasTimedout && !isNextEvent { - zap.L().Warn("filled gap in watch stream", zap.Int64("from", t.upperBound), zap.Int64("to", valE.GetModRev())) - watchGapTimeoutCount.Inc() + + // Report on gap timeouts + if !isNextEvent && hasTimedout { + zap.L().Warn("filled gap in event buffer", zap.Int64("from", t.max), zap.Int64("to", event.GetRevision())) + timeBufferTimeoutCount.Inc() } + + // We've reached a gap and it hasn't timed out yet if !isNextEvent && !hasTimedout { break } - if t.ch != nil { - t.ch <- valE - } - t.upperBound = valE.GetModRev() - t.upperVal = val - if t.lowerBound == -1 { - t.lowerBound = valE.GetModRev() - } - - currentWatchRev.Set(float64(t.upperBound)) - watchLatency.Observe(age.Seconds()) - - val = val.Next() + t.advanceCursorUnlocked(item, event) + item = item.Next() continue } t.trimUnlocked() } + +func (t *TimeBuffer[T, TT]) advanceCursorUnlocked(item *Element[TT], event TT) { + t.ch <- event + t.max = event.GetRevision() + t.cursor = item + + if t.min == -1 { + t.min = event.GetRevision() + } + + timeBufferVisibleMax.Set(float64(t.max)) +} diff --git a/internal/util/timebuffer_test.go b/internal/util/timebuffer_test.go index e2aef3e..ed54424 100644 --- a/internal/util/timebuffer_test.go +++ b/internal/util/timebuffer_test.go @@ -1,132 +1,102 @@ package util import ( - "context" + "math/rand" "testing" "time" - "github.com/stretchr/testify/assert" - "go.etcd.io/etcd/pkg/v3/adt" - "github.com/Azure/metaetcd/internal/testutil" + "github.com/stretchr/testify/assert" ) -func TestTimeBufferOrdering(t *testing.T) { - ch := make(chan *testEvent, 100) - b := NewTimeBuffer[adt.Interval](time.Millisecond*10, 4, ch) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - done := make(chan struct{}) - go func() { - b.Run(ctx) - close(done) - }() - - // The first event starts at rev 2, wait for the initial gap - b.Push(newTestEvent(2)) - buf, lb, up := b.Range(0, defaultKeyRange) - assert.Equal(t, int64(-1), lb) - assert.Equal(t, int64(0), up) - assert.Len(t, buf, 0) - <-ch - - // Create gap - b.Push(newTestEvent(4)) - - // Full range - but only the first should be returned since there is a gap - buf, lb, up = b.Range(0, defaultKeyRange) - assert.Equal(t, int64(2), lb) - assert.Equal(t, int64(2), up) - assert.Equal(t, []int64{2}, testutil.EventModRevs(buf)) - - // Fill the gap - b.Push(newTestEvent(3)) - - // Full range - buf, lb, up = b.Range(0, defaultKeyRange) - assert.Equal(t, int64(2), lb) - assert.Equal(t, int64(4), up) - assert.Equal(t, []int64{2, 3, 4}, testutil.EventModRevs(buf)) - - // Partial range - buf, lb, up = b.Range(2, defaultKeyRange) - assert.Equal(t, int64(2), lb) - assert.Equal(t, int64(4), up) - assert.Equal(t, []int64{3, 4}, testutil.EventModRevs(buf)) - - // Push event to create another gap - b.Push(newTestEvent(6)) - - // This gap is never filled - wait for the timeout - for { - buf, lb, up = b.Range(0, defaultKeyRange) - if len(buf) == 4 { - assert.Equal(t, int64(2), lb) - assert.Equal(t, int64(6), up) - break - } - time.Sleep(time.Millisecond * 5) +// FuzzTimeBufferPush proves that the buffer is always sorted when given random input. +func FuzzTimeBufferPush(f *testing.F) { + numbers := make([]int64, 1000) + for i := range numbers { + numbers[i] = int64(rand.Int()) + } + for _, n := range numbers { + f.Add(n) } - assert.Equal(t, []int64{2, 3, 4, 6}, testutil.EventModRevs(buf)) - // Push another event, which will cause the earliest event to fall off - b.Push(newTestEvent(7)) - buf, lb, up = b.Range(0, defaultKeyRange) - assert.Equal(t, int64(3), lb) - assert.Equal(t, int64(7), up) - assert.Equal(t, []int64{3, 4, 6, 7}, testutil.EventModRevs(buf)) + b := NewTimeBuffer[struct{}](time.Millisecond*10, 4, make(chan<- *testEvent, 100)) + f.Fuzz(func(t *testing.T, n int64) { + b.Push(newTestEvent(n)) - cancel() - <-done + var prev int64 + for i, n := range b.All() { + if i == 0 { + prev = n.GetRevision() + continue + } + if n.GetRevision() < prev { + t.Errorf("buffer is out of order - %d < %d", n.GetRevision(), prev) + } + prev = n.GetRevision() + } + }) } -func TestTimeBufferBridgeGap(t *testing.T) { - b := NewTimeBuffer[adt.Interval, *testEvent](time.Second, 10, nil) +// TestTimeBufferCursor proves that buffer's cursor moves forward as out-of-order events (without gaps) are added. +func TestTimeBufferCursor(t *testing.T) { + b := NewTimeBuffer[struct{}](time.Second, 10, make(chan<- *testEvent, 100)) - events := []int64{4, 3, 1, 2} - expectedUpperBounds := []int64{0, 0, 1, 4} + events := []int64{4, 3, 1, 2, 5} + expectedUpperBounds := []int64{0, 0, 1, 4, 5} for i, rev := range events { b.Push(newTestEvent(rev)) - assert.Equal(t, expectedUpperBounds[i], b.upperBound, "iteration: %d", i) + assert.Equal(t, expectedUpperBounds[i], b.LatestVisibleRev(), "iteration: %d", i) } } -func TestTimeBufferTrimWhenGap(t *testing.T) { - b := NewTimeBuffer[adt.Interval, *testEvent](time.Millisecond, 2, nil) +// TestTimeBufferPruningCursor proves that events are not pruned until they are visible. +func TestTimeBufferPruningCursor(t *testing.T) { + b := NewTimeBuffer[struct{}](time.Millisecond, 2, make(chan<- *testEvent, 100)) // Fill the buffer and more const n = 10 for i := 0; i < n; i++ { b.Push(newTestEvent(int64(i + 3))) } - assert.Equal(t, n, b.list.Len()) + assert.Equal(t, n, b.list.Len) // Bridge the gap and prove the buffer was shortened time.Sleep(time.Millisecond * 2) b.bridgeGapUnlocked() - assert.Equal(t, 2, b.list.Len()) + assert.Equal(t, 2, b.Len()) } -var defaultKeyRange = adt.NewStringAffineInterval("foo", "foo0") +// TestTimeBufferRange proves that ranges filter on start revision, the visibility window, and the query. +func TestTimeBufferRange(t *testing.T) { + b := NewTimeBuffer[struct{}](time.Second, 10, make(chan<- *testEvent, 100)) + + for i, rev := range []int64{4, 3, 1, 2, 5, 6, 7, 9} { + event := newTestEvent(rev) + if i == 0 { + event.Invisible = true + } + b.Push(event) + } + + results, min, max := b.Range(2, struct{}{}) + assert.Equal(t, int64(1), min) + assert.Equal(t, int64(7), max) + assert.Equal(t, []int64{3, 5, 6, 7}, testutil.EventModRevs(results)) +} type testEvent struct { Rev int64 - Key adt.Interval Timestamp time.Time + Invisible bool } func newTestEvent(rev int64) *testEvent { return &testEvent{ Rev: rev, Timestamp: time.Now(), - Key: adt.NewStringAffinePoint("foo/test"), } } -func (e *testEvent) GetAge() time.Duration { return time.Since(e.Timestamp) } -func (e *testEvent) GetModRev() int64 { return e.Rev } -func (e *testEvent) InRange(ivl adt.Interval) bool { return ivl.Compare(&e.Key) == 0 } - -// TODO: Don't use adt here and better tests for InRange +func (e *testEvent) GetAge() time.Duration { return time.Since(e.Timestamp) } +func (e *testEvent) GetRevision() int64 { return e.Rev } +func (e *testEvent) Matches(query struct{}) bool { return !e.Invisible } diff --git a/internal/watch/metrics.go b/internal/watch/metrics.go index 6a5c006..7068f14 100644 --- a/internal/watch/metrics.go +++ b/internal/watch/metrics.go @@ -8,8 +8,15 @@ var ( Name: "metaetcd_stale_watch_count", Help: "Number of stale watch connections.", }) + + watchEventCount = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "metaetcd_watch_event_count", + Help: "The total watch events that have been pushed into the buffer.", + }) ) func init() { prometheus.MustRegister(staleWatchCount) + prometheus.MustRegister(watchEventCount) } diff --git a/internal/watch/mux.go b/internal/watch/mux.go index 7dfd150..9540252 100644 --- a/internal/watch/mux.go +++ b/internal/watch/mux.go @@ -60,7 +60,7 @@ func (m *Mux) StartWatch(client *clientv3.Client) (*Status, error) { } nextEvent := (resp.Header.Revision + 1) - startRev := nextEvent - int64(m.buffer.MaxLen()) + startRev := nextEvent - int64(m.buffer.Len()) if startRev < 0 { startRev = 0 } @@ -87,6 +87,7 @@ func (m *Mux) watchLoop(w clientv3.WatchChan) { for _, event := range events { zap.L().Info("observed watch event", zap.Int64("metaRev", meta)) + watchEventCount.Inc() m.buffer.Push(&eventWrapper{ Event: event, Timestamp: time.Now(), @@ -164,5 +165,5 @@ type eventWrapper struct { } func (e *eventWrapper) GetAge() time.Duration { return time.Since(e.Timestamp) } -func (e *eventWrapper) GetModRev() int64 { return e.Kv.ModRevision } -func (e *eventWrapper) InRange(ivl adt.Interval) bool { return ivl.Compare(&e.Key) == 0 } +func (e *eventWrapper) GetRevision() int64 { return e.Kv.ModRevision } +func (e *eventWrapper) Matches(ivl adt.Interval) bool { return ivl.Compare(&e.Key) == 0 }