This commit is contained in:
Jordan Olshevski 2022-07-25 12:08:47 -05:00
Родитель b5af21d782
Коммит 9cafd96172
9 изменённых файлов: 290 добавлений и 229 удалений

12
go.mod
Просмотреть файл

@ -5,7 +5,9 @@ go 1.18
require ( require (
github.com/coreos/etcd v3.3.27+incompatible github.com/coreos/etcd v3.3.27+incompatible
github.com/google/uuid v1.1.2 github.com/google/uuid v1.1.2
github.com/prometheus/client_golang v1.12.2
github.com/stretchr/testify v1.7.2 github.com/stretchr/testify v1.7.2
go.etcd.io/etcd/pkg/v3 v3.5.4
go.uber.org/zap v1.21.0 go.uber.org/zap v1.21.0
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
google.golang.org/grpc v1.47.0 google.golang.org/grpc v1.47.0
@ -14,32 +16,24 @@ require (
require ( require (
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/coreos/bbolt v1.3.2 // indirect
github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect github.com/golang/protobuf v1.5.2 // indirect
github.com/google/btree v1.1.2 // indirect github.com/google/btree v1.1.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.12.2 // indirect
github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect github.com/prometheus/procfs v0.7.3 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect github.com/soheilhy/cmux v0.1.5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.4 // indirect
go.uber.org/atomic v1.7.0 // indirect go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 // indirect golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9 // indirect
@ -48,8 +42,6 @@ require (
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
google.golang.org/protobuf v1.27.1 // indirect google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect sigs.k8s.io/yaml v1.3.0 // indirect
) )

1
go.sum
Просмотреть файл

@ -52,7 +52,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=

Просмотреть файл

@ -66,13 +66,13 @@ func EventKeys(events []*mvccpb.Event) []string {
} }
type TimestampedEvent interface { type TimestampedEvent interface {
GetModRev() int64 GetRevision() int64
} }
func EventModRevs[T TimestampedEvent](events []T) []int64 { func EventModRevs[T TimestampedEvent](events []T) []int64 {
ret := make([]int64, len(events)) ret := make([]int64, len(events))
for i, event := range events { for i, event := range events {
ret[i] = event.GetModRev() ret[i] = event.GetRevision()
} }
return ret return ret
} }

96
internal/util/list.go Normal file
Просмотреть файл

@ -0,0 +1,96 @@
package util
// List was adopted from https://gist.github.com/pje/90e727f80685c78a6c1cfff35f62155a.
// Replaec with container/list once it's generic.
type List[T any] struct {
root Element[T]
Len int
}
func (l *List[T]) First() *Element[T] {
if l.Len == 0 {
return nil
}
return l.root.next
}
func (l *List[T]) Last() *Element[T] {
if l.Len == 0 {
return nil
}
return l.root.prev
}
func (l *List[T]) PushFront(v T) *Element[T] {
if l.root.next == nil {
l.init()
}
return l.InsertAfter(v, &l.root)
}
func (l *List[T]) PushBack(v T) *Element[T] {
if l.root.next == nil {
l.init()
}
return l.InsertAfter(v, l.root.prev)
}
func (l *List[T]) Remove(e *Element[T]) T {
if e.list == l {
l.remove(e)
}
return e.Value
}
func (l *List[T]) init() {
l.root = *new(Element[T])
l.root.next = &l.root
l.root.prev = &l.root
}
func (l *List[T]) insertAfter(e *Element[T], at *Element[T]) *Element[T] {
e.prev = at
e.next = at.next
e.prev.next = e
e.next.prev = e
e.list = l
l.Len++
return e
}
func (l *List[T]) InsertAfter(v T, at *Element[T]) *Element[T] {
e := Element[T]{Value: v}
return l.insertAfter(&e, at)
}
func (l *List[T]) remove(e *Element[T]) {
e.prev.next = e.next
e.next.prev = e.prev
e.next = nil
e.prev = nil
e.list = nil
l.Len--
}
type Element[T any] struct {
prev *Element[T]
next *Element[T]
list *List[T]
Value T
}
func (e *Element[T]) Next() *Element[T] {
n := e.next
if e.list == nil || n == &e.list.root {
return nil
}
return n
}
func (e *Element[T]) Prev() *Element[T] {
p := e.prev
if e.list == nil || p == &e.list.root {
return nil
}
return p
}

Просмотреть файл

@ -3,41 +3,27 @@ package util
import "github.com/prometheus/client_golang/prometheus" import "github.com/prometheus/client_golang/prometheus"
var ( var (
currentWatchRev = prometheus.NewGauge( timeBufferVisibleMax = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "metaetcd_current_watch_rev", Name: "metaetcd_time_buffer_visible_max",
Help: "The last meta cluster revision observed.", Help: "Max revision visible from the time buffer",
}) })
watchBufferLength = prometheus.NewGauge( timeBufferLength = prometheus.NewGauge(
prometheus.GaugeOpts{ prometheus.GaugeOpts{
Name: "metaetcd_watch_buffer_len", Name: "metaetcd_time_buffer_len",
Help: "The length of the watch buffer.", Help: "The length of the time buffer.",
}) })
watchEventCount = prometheus.NewGauge( timeBufferTimeoutCount = prometheus.NewCounter(
prometheus.GaugeOpts{
Name: "metaetcd_watch_event_count",
Help: "The total watch events that have been pushed into the buffer.",
})
watchGapTimeoutCount = prometheus.NewCounter(
prometheus.CounterOpts{ prometheus.CounterOpts{
Name: "metaetcd_watch_gap_timeout_count", Name: "metaetcd_time_buffer_timeouts_count",
Help: "The number of watch event gaps that were never filled.", Help: "The number of buffer event gaps that were never filled.",
})
watchLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "metaetcd_watch_latency_seconds",
Help: "The time between a watch event being received and exposed.",
}) })
) )
func init() { func init() {
prometheus.MustRegister(currentWatchRev) prometheus.MustRegister(timeBufferVisibleMax)
prometheus.MustRegister(watchBufferLength) prometheus.MustRegister(timeBufferLength)
prometheus.MustRegister(watchEventCount) prometheus.MustRegister(timeBufferTimeoutCount)
prometheus.MustRegister(watchGapTimeoutCount)
prometheus.MustRegister(watchLatency)
} }

Просмотреть файл

@ -1,7 +1,6 @@
package util package util
import ( import (
"container/list"
"context" "context"
"sync" "sync"
"time" "time"
@ -11,32 +10,33 @@ import (
type BufferableEvent[T any] interface { type BufferableEvent[T any] interface {
GetAge() time.Duration GetAge() time.Duration
GetModRev() int64 GetRevision() int64
InRange(T) bool Matches(query T) bool
} }
// TimeBuffer buffers events and sorts them by their logical timestamp.
//
// Events are only visible to readers when the preceding event has been received.
// If the preceding event is never received, the following event will still become
// visible after a (wallclock) timeout period in order to prevent deadlocks caused
// dropped events.
//
// In practice, it is useful for merging streams of events that may be out of order due
// to network latency, and have the possibility of missing events due to network partitions.
type TimeBuffer[T any, TT BufferableEvent[T]] struct { type TimeBuffer[T any, TT BufferableEvent[T]] struct {
mut sync.Mutex mut sync.Mutex
list *list.List list *List[TT]
gapTimeout time.Duration gapTimeout time.Duration
maxLen int len int
lowerBound, upperBound int64 min, max int64
upperVal *list.Element cursor *Element[TT]
ch chan<- TT ch chan<- TT
} }
func NewTimeBuffer[T any, TT BufferableEvent[T]](gapTimeout time.Duration, maxLen int, ch chan<- TT) *TimeBuffer[T, TT] { func NewTimeBuffer[T any, TT BufferableEvent[T]](gapTimeout time.Duration, len int, ch chan<- TT) *TimeBuffer[T, TT] {
return &TimeBuffer[T, TT]{list: list.New(), gapTimeout: gapTimeout, maxLen: maxLen, ch: ch, lowerBound: -1} return &TimeBuffer[T, TT]{list: &List[TT]{}, gapTimeout: gapTimeout, len: len, min: -1, ch: ch}
} }
func (t *TimeBuffer[T, TT]) LatestVisibleRev() int64 {
t.mut.Lock()
defer t.mut.Unlock()
return t.upperBound
}
func (t *TimeBuffer[T, TT]) MaxLen() int { return t.maxLen }
func (t *TimeBuffer[T, TT]) Run(ctx context.Context) { func (t *TimeBuffer[T, TT]) Run(ctx context.Context) {
ticker := time.NewTicker(t.gapTimeout) ticker := time.NewTicker(t.gapTimeout)
defer ticker.Stop() defer ticker.Stop()
@ -50,144 +50,154 @@ func (t *TimeBuffer[T, TT]) Run(ctx context.Context) {
} }
} }
func (t *TimeBuffer[T, TT]) Push(event TT) { func (t *TimeBuffer[T, TT]) LatestVisibleRev() int64 {
watchEventCount.Inc()
t.mut.Lock() t.mut.Lock()
defer t.mut.Unlock() defer t.mut.Unlock()
return t.max
}
func (t *TimeBuffer[T, TT]) Len() int { return t.len }
func (t *TimeBuffer[T, TT]) Push(event TT) {
t.mut.Lock()
defer t.mut.Unlock()
t.pushUnlocked(event) t.pushUnlocked(event)
t.bridgeGapUnlocked() t.bridgeGapUnlocked()
} }
func (t *TimeBuffer[T, TT]) pushUnlocked(event TT) { func (t *TimeBuffer[T, TT]) pushUnlocked(event TT) {
watchBufferLength.Inc() timeBufferLength.Inc()
lastEl := t.list.Back()
// Case 1: first element cursorItem := t.list.Last() // start at the newest event
if lastEl == nil {
t.list.PushFront(event)
return
}
// Case 2: outside of range - insert before or after
last := lastEl.Value.(TT)
if event.GetModRev() > last.GetModRev() {
t.list.PushBack(event)
return
}
firstEl := t.list.Front()
first := firstEl.Value.(TT)
if event.GetModRev() < first.GetModRev() {
t.list.PushFront(event)
return
}
// Case 3: find place between pairs of events
for { for {
firstEl = lastEl.Prev() if cursorItem == nil {
if firstEl == nil { t.list.PushFront(event)
break return // prepend to back of list
} }
first = firstEl.Value.(TT)
if event.GetModRev() > first.GetModRev() { cursorEvent := cursorItem.Value
t.list.InsertAfter(event, firstEl) if event.GetRevision() < cursorEvent.GetRevision() {
return cursorItem = cursorItem.Prev()
continue // keep scanning back in time
} }
lastEl = firstEl
t.list.InsertAfter(event, cursorItem)
return
} }
} }
func (t *TimeBuffer[T, TT]) trimUnlocked() { func (t *TimeBuffer[T, TT]) trimUnlocked() {
item := t.list.Front() item := t.list.First()
for { for {
if t.list.Len() <= t.maxLen || item == nil { if t.list.Len <= t.len || item == nil {
return break
} }
event := item.Value.(TT) event := item.Value
next := item.Next() next := item.Next()
if t.upperBound > event.GetModRev() {
watchBufferLength.Dec() // Trim if the buffer is too long
if t.max > event.GetRevision() {
timeBufferLength.Dec()
t.list.Remove(item) t.list.Remove(item)
} }
// Keep track of the buffer's tail
if next != nil { if next != nil {
newFront := next.Value.(TT) t.min = next.Value.GetRevision()
t.lowerBound = newFront.GetModRev()
} }
item = next item = next
} }
} }
func (t *TimeBuffer[T, TT]) Range(start int64, ivl T) ([]TT, int64, int64) { func (t *TimeBuffer[T, TT]) All() (slice []TT) {
t.mut.Lock()
defer t.mut.Unlock()
item := t.list.First() // start at the oldest event
for {
if item == nil {
break
}
slice = append(slice, item.Value)
item = item.Next()
}
return
}
func (t *TimeBuffer[T, TT]) Range(start int64, query T) ([]TT, int64, int64) {
t.mut.Lock() t.mut.Lock()
defer t.mut.Unlock() defer t.mut.Unlock()
slice := []TT{} slice := []TT{}
pos := t.list.Front() item := t.list.First() // start at the oldest event
for { for {
if pos == nil { if item == nil {
break break
} }
e := pos.Value.(TT) event := item.Value
if e.GetModRev() <= start { if event.GetRevision() <= start {
pos = pos.Next() item = item.Next()
continue continue
} }
if e.GetModRev() > t.upperBound { if event.GetRevision() > t.max {
break break
} }
if e.InRange(ivl) { if event.Matches(query) {
slice = append(slice, e) slice = append(slice, event)
} }
pos = pos.Next() item = item.Next()
} }
return slice, t.lowerBound, t.upperBound return slice, t.min, t.max
} }
func (t *TimeBuffer[T, TT]) bridgeGapUnlocked() { func (t *TimeBuffer[T, TT]) bridgeGapUnlocked() {
val := t.upperVal item := t.cursor // start at the oldest visible event and scan forwards
if val == nil { if item == nil {
val = t.list.Front() item = t.list.First()
} }
for { for {
if val == nil || val.Value == nil { if item == nil {
break break // at end of list
} }
valE := val.Value.(TT) event := item.Value
if valE.GetModRev() <= t.upperBound { // Not a gap - keep scanning
val = val.Next() if event.GetRevision() <= t.max {
continue // this gap has already been closed item = item.Next()
continue
} }
isNextEvent := valE.GetModRev() == t.upperBound+1 isNextEvent := event.GetRevision() == t.max+1
age := valE.GetAge() age := event.GetAge()
hasTimedout := age > t.gapTimeout hasTimedout := age > t.gapTimeout
if hasTimedout && !isNextEvent {
zap.L().Warn("filled gap in watch stream", zap.Int64("from", t.upperBound), zap.Int64("to", valE.GetModRev())) // Report on gap timeouts
watchGapTimeoutCount.Inc() if !isNextEvent && hasTimedout {
zap.L().Warn("filled gap in event buffer", zap.Int64("from", t.max), zap.Int64("to", event.GetRevision()))
timeBufferTimeoutCount.Inc()
} }
// We've reached a gap and it hasn't timed out yet
if !isNextEvent && !hasTimedout { if !isNextEvent && !hasTimedout {
break break
} }
if t.ch != nil { t.advanceCursorUnlocked(item, event)
t.ch <- valE item = item.Next()
}
t.upperBound = valE.GetModRev()
t.upperVal = val
if t.lowerBound == -1 {
t.lowerBound = valE.GetModRev()
}
currentWatchRev.Set(float64(t.upperBound))
watchLatency.Observe(age.Seconds())
val = val.Next()
continue continue
} }
t.trimUnlocked() t.trimUnlocked()
} }
func (t *TimeBuffer[T, TT]) advanceCursorUnlocked(item *Element[TT], event TT) {
t.ch <- event
t.max = event.GetRevision()
t.cursor = item
if t.min == -1 {
t.min = event.GetRevision()
}
timeBufferVisibleMax.Set(float64(t.max))
}

Просмотреть файл

@ -1,132 +1,102 @@
package util package util
import ( import (
"context" "math/rand"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/pkg/v3/adt"
"github.com/Azure/metaetcd/internal/testutil" "github.com/Azure/metaetcd/internal/testutil"
"github.com/stretchr/testify/assert"
) )
func TestTimeBufferOrdering(t *testing.T) { // FuzzTimeBufferPush proves that the buffer is always sorted when given random input.
ch := make(chan *testEvent, 100) func FuzzTimeBufferPush(f *testing.F) {
b := NewTimeBuffer[adt.Interval](time.Millisecond*10, 4, ch) numbers := make([]int64, 1000)
for i := range numbers {
ctx, cancel := context.WithCancel(context.Background()) numbers[i] = int64(rand.Int())
defer cancel() }
for _, n := range numbers {
done := make(chan struct{}) f.Add(n)
go func() {
b.Run(ctx)
close(done)
}()
// The first event starts at rev 2, wait for the initial gap
b.Push(newTestEvent(2))
buf, lb, up := b.Range(0, defaultKeyRange)
assert.Equal(t, int64(-1), lb)
assert.Equal(t, int64(0), up)
assert.Len(t, buf, 0)
<-ch
// Create gap
b.Push(newTestEvent(4))
// Full range - but only the first should be returned since there is a gap
buf, lb, up = b.Range(0, defaultKeyRange)
assert.Equal(t, int64(2), lb)
assert.Equal(t, int64(2), up)
assert.Equal(t, []int64{2}, testutil.EventModRevs(buf))
// Fill the gap
b.Push(newTestEvent(3))
// Full range
buf, lb, up = b.Range(0, defaultKeyRange)
assert.Equal(t, int64(2), lb)
assert.Equal(t, int64(4), up)
assert.Equal(t, []int64{2, 3, 4}, testutil.EventModRevs(buf))
// Partial range
buf, lb, up = b.Range(2, defaultKeyRange)
assert.Equal(t, int64(2), lb)
assert.Equal(t, int64(4), up)
assert.Equal(t, []int64{3, 4}, testutil.EventModRevs(buf))
// Push event to create another gap
b.Push(newTestEvent(6))
// This gap is never filled - wait for the timeout
for {
buf, lb, up = b.Range(0, defaultKeyRange)
if len(buf) == 4 {
assert.Equal(t, int64(2), lb)
assert.Equal(t, int64(6), up)
break
}
time.Sleep(time.Millisecond * 5)
} }
assert.Equal(t, []int64{2, 3, 4, 6}, testutil.EventModRevs(buf))
// Push another event, which will cause the earliest event to fall off b := NewTimeBuffer[struct{}](time.Millisecond*10, 4, make(chan<- *testEvent, 100))
b.Push(newTestEvent(7)) f.Fuzz(func(t *testing.T, n int64) {
buf, lb, up = b.Range(0, defaultKeyRange) b.Push(newTestEvent(n))
assert.Equal(t, int64(3), lb)
assert.Equal(t, int64(7), up)
assert.Equal(t, []int64{3, 4, 6, 7}, testutil.EventModRevs(buf))
cancel() var prev int64
<-done for i, n := range b.All() {
if i == 0 {
prev = n.GetRevision()
continue
}
if n.GetRevision() < prev {
t.Errorf("buffer is out of order - %d < %d", n.GetRevision(), prev)
}
prev = n.GetRevision()
}
})
} }
func TestTimeBufferBridgeGap(t *testing.T) { // TestTimeBufferCursor proves that buffer's cursor moves forward as out-of-order events (without gaps) are added.
b := NewTimeBuffer[adt.Interval, *testEvent](time.Second, 10, nil) func TestTimeBufferCursor(t *testing.T) {
b := NewTimeBuffer[struct{}](time.Second, 10, make(chan<- *testEvent, 100))
events := []int64{4, 3, 1, 2} events := []int64{4, 3, 1, 2, 5}
expectedUpperBounds := []int64{0, 0, 1, 4} expectedUpperBounds := []int64{0, 0, 1, 4, 5}
for i, rev := range events { for i, rev := range events {
b.Push(newTestEvent(rev)) b.Push(newTestEvent(rev))
assert.Equal(t, expectedUpperBounds[i], b.upperBound, "iteration: %d", i) assert.Equal(t, expectedUpperBounds[i], b.LatestVisibleRev(), "iteration: %d", i)
} }
} }
func TestTimeBufferTrimWhenGap(t *testing.T) { // TestTimeBufferPruningCursor proves that events are not pruned until they are visible.
b := NewTimeBuffer[adt.Interval, *testEvent](time.Millisecond, 2, nil) func TestTimeBufferPruningCursor(t *testing.T) {
b := NewTimeBuffer[struct{}](time.Millisecond, 2, make(chan<- *testEvent, 100))
// Fill the buffer and more // Fill the buffer and more
const n = 10 const n = 10
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
b.Push(newTestEvent(int64(i + 3))) b.Push(newTestEvent(int64(i + 3)))
} }
assert.Equal(t, n, b.list.Len()) assert.Equal(t, n, b.list.Len)
// Bridge the gap and prove the buffer was shortened // Bridge the gap and prove the buffer was shortened
time.Sleep(time.Millisecond * 2) time.Sleep(time.Millisecond * 2)
b.bridgeGapUnlocked() b.bridgeGapUnlocked()
assert.Equal(t, 2, b.list.Len()) assert.Equal(t, 2, b.Len())
} }
var defaultKeyRange = adt.NewStringAffineInterval("foo", "foo0") // TestTimeBufferRange proves that ranges filter on start revision, the visibility window, and the query.
func TestTimeBufferRange(t *testing.T) {
b := NewTimeBuffer[struct{}](time.Second, 10, make(chan<- *testEvent, 100))
for i, rev := range []int64{4, 3, 1, 2, 5, 6, 7, 9} {
event := newTestEvent(rev)
if i == 0 {
event.Invisible = true
}
b.Push(event)
}
results, min, max := b.Range(2, struct{}{})
assert.Equal(t, int64(1), min)
assert.Equal(t, int64(7), max)
assert.Equal(t, []int64{3, 5, 6, 7}, testutil.EventModRevs(results))
}
type testEvent struct { type testEvent struct {
Rev int64 Rev int64
Key adt.Interval
Timestamp time.Time Timestamp time.Time
Invisible bool
} }
func newTestEvent(rev int64) *testEvent { func newTestEvent(rev int64) *testEvent {
return &testEvent{ return &testEvent{
Rev: rev, Rev: rev,
Timestamp: time.Now(), Timestamp: time.Now(),
Key: adt.NewStringAffinePoint("foo/test"),
} }
} }
func (e *testEvent) GetAge() time.Duration { return time.Since(e.Timestamp) } func (e *testEvent) GetAge() time.Duration { return time.Since(e.Timestamp) }
func (e *testEvent) GetModRev() int64 { return e.Rev } func (e *testEvent) GetRevision() int64 { return e.Rev }
func (e *testEvent) InRange(ivl adt.Interval) bool { return ivl.Compare(&e.Key) == 0 } func (e *testEvent) Matches(query struct{}) bool { return !e.Invisible }
// TODO: Don't use adt here and better tests for InRange

Просмотреть файл

@ -8,8 +8,15 @@ var (
Name: "metaetcd_stale_watch_count", Name: "metaetcd_stale_watch_count",
Help: "Number of stale watch connections.", Help: "Number of stale watch connections.",
}) })
watchEventCount = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "metaetcd_watch_event_count",
Help: "The total watch events that have been pushed into the buffer.",
})
) )
func init() { func init() {
prometheus.MustRegister(staleWatchCount) prometheus.MustRegister(staleWatchCount)
prometheus.MustRegister(watchEventCount)
} }

Просмотреть файл

@ -60,7 +60,7 @@ func (m *Mux) StartWatch(client *clientv3.Client) (*Status, error) {
} }
nextEvent := (resp.Header.Revision + 1) nextEvent := (resp.Header.Revision + 1)
startRev := nextEvent - int64(m.buffer.MaxLen()) startRev := nextEvent - int64(m.buffer.Len())
if startRev < 0 { if startRev < 0 {
startRev = 0 startRev = 0
} }
@ -87,6 +87,7 @@ func (m *Mux) watchLoop(w clientv3.WatchChan) {
for _, event := range events { for _, event := range events {
zap.L().Info("observed watch event", zap.Int64("metaRev", meta)) zap.L().Info("observed watch event", zap.Int64("metaRev", meta))
watchEventCount.Inc()
m.buffer.Push(&eventWrapper{ m.buffer.Push(&eventWrapper{
Event: event, Event: event,
Timestamp: time.Now(), Timestamp: time.Now(),
@ -164,5 +165,5 @@ type eventWrapper struct {
} }
func (e *eventWrapper) GetAge() time.Duration { return time.Since(e.Timestamp) } func (e *eventWrapper) GetAge() time.Duration { return time.Since(e.Timestamp) }
func (e *eventWrapper) GetModRev() int64 { return e.Kv.ModRevision } func (e *eventWrapper) GetRevision() int64 { return e.Kv.ModRevision }
func (e *eventWrapper) InRange(ivl adt.Interval) bool { return ivl.Compare(&e.Key) == 0 } func (e *eventWrapper) Matches(ivl adt.Interval) bool { return ivl.Compare(&e.Key) == 0 }