Reject watches that start too early

This commit is contained in:
Jordan Olshevski 2022-07-18 14:08:23 -05:00
Родитель 9a218be335
Коммит 2d8ca7222b
5 изменённых файлов: 83 добавлений и 31 удалений

Просмотреть файл

@ -22,10 +22,17 @@ var (
Name: "metaetcd_active_watch_count",
Help: "Number of active watch connections.",
})
watchLateStartCount = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "metaetcd_watch_late_start_count",
Help: "Number of total watch requests with revisions that are too late.",
})
)
func init() {
prometheus.MustRegister(requestCount)
prometheus.MustRegister(getMemberRevDepth)
prometheus.MustRegister(activeWatchCount)
prometheus.MustRegister(watchLateStartCount)
}

Просмотреть файл

@ -186,9 +186,18 @@ func (s *server) Watch(srv etcdserverpb.Watch_WatchServer) error {
return err
}
}
watch := s.members.WatchMux.Watch(r.Key, r.RangeEnd, r.StartRevision)
if watch == nil {
zap.L().Warn("watch start revision is too early", zap.Int64("rev", r.StartRevision))
watchLateStartCount.Inc()
ch <- &etcdserverpb.WatchResponse{WatchId: r.WatchId, Created: false, Header: &etcdserverpb.ResponseHeader{}}
return nil
}
wg.Go(func() error {
zap.L().Info("adding keyspace to watch connection", zap.String("watchID", id), zap.String("start", string(r.Key)), zap.String("end", string(r.RangeEnd)), zap.Int64("metaRev", r.StartRevision))
s.members.WatchMux.Watch(srv.Context(), r.Key, r.RangeEnd, r.StartRevision, ch)
zap.L().Info("starting watch connection", zap.String("watchID", id), zap.String("start", string(r.Key)), zap.String("end", string(r.RangeEnd)), zap.Int64("metaRev", r.StartRevision))
watch.Run(srv.Context(), ch)
return nil
})
ch <- &etcdserverpb.WatchResponse{WatchId: r.WatchId, Created: true, Header: &etcdserverpb.ResponseHeader{}}

Просмотреть файл

@ -12,22 +12,20 @@ import (
"go.uber.org/zap"
)
// TODO: Return error for attempts to start watch connection before the oldest message in the buffer
// TODO: Consider watching only the metakey instead of maintaining event buffer for entire keyspace
type buffer struct {
mut sync.Mutex
list *list.List
gapTimeout time.Duration
maxLen int
upperBound int64
bcast *broadcast
upperVal *list.Element
mut sync.Mutex
list *list.List
gapTimeout time.Duration
maxLen int
lowerBound, upperBound int64
bcast *broadcast
upperVal *list.Element
}
func newBuffer(gapTimeout time.Duration, maxLen int, bcast *broadcast) *buffer {
return &buffer{list: list.New(), gapTimeout: gapTimeout, maxLen: maxLen, bcast: bcast}
return &buffer{list: list.New(), gapTimeout: gapTimeout, maxLen: maxLen, bcast: bcast, lowerBound: -1}
}
func (b *buffer) Run(ctx context.Context) {
@ -113,13 +111,24 @@ func (b *buffer) trimUnlocked() {
if front == b.upperVal {
return // don't trim events until the gap has been filled
}
next := front.Next()
if next != nil {
newFront := next.Value.(*eventWrapper)
b.lowerBound = newFront.Kv.ModRevision
}
b.list.Remove(front)
}
func (b *buffer) StartRange(start int64) *list.Element {
func (b *buffer) StartRange(start int64) (*list.Element, bool) {
b.mut.Lock()
defer b.mut.Unlock()
if start < b.lowerBound {
return nil, false
}
val := b.list.Front()
for i := 0; true; i++ {
if val == nil {
@ -130,11 +139,11 @@ func (b *buffer) StartRange(start int64) *list.Element {
break // buffer starts after the requested start rev
}
if e.Kv.ModRevision == start {
return val
return val, true
}
val = val.Next()
}
return nil
return nil, true
}
func (b *buffer) Range(start *list.Element, ivl adt.IntervalTree) (slice []*mvccpb.Event, n int, pos *list.Element) {
@ -193,6 +202,9 @@ func (b *buffer) bridgeGapUnlocked() (ok, changed bool) {
}
b.upperBound = valE.Kv.ModRevision
if b.lowerBound == -1 {
b.lowerBound = valE.Kv.ModRevision
}
currentWatchRev.Set(float64(b.upperBound))
watchLatency.Observe(age.Seconds())

Просмотреть файл

@ -1,6 +1,7 @@
package watch
import (
"container/list"
"context"
"testing"
"time"
@ -32,7 +33,7 @@ func TestBufferOrdering(t *testing.T) {
// The first event starts at rev 2, wait for the initial gap
b.Push(eventWithModRev(2))
_, n, _ := b.Range(b.StartRange(0), defaultKeyRange)
_, n, _ := b.Range(startRange(b, 0), defaultKeyRange)
assert.Equal(t, 0, n)
<-ch
@ -40,7 +41,7 @@ func TestBufferOrdering(t *testing.T) {
b.Push(eventWithModRev(4))
// Full range - but only the first should be returned since there is a gap
buf, n, _ := b.Range(b.StartRange(0), defaultKeyRange)
buf, n, _ := b.Range(startRange(b, 0), defaultKeyRange)
assert.Equal(t, 1, n)
assert.Equal(t, []int64{2}, testutil.EventModRevs(buf))
@ -48,12 +49,12 @@ func TestBufferOrdering(t *testing.T) {
b.Push(eventWithModRev(3))
// Full range
buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange)
buf, n, _ = b.Range(startRange(b, 0), defaultKeyRange)
assert.Equal(t, 3, n)
assert.Equal(t, []int64{2, 3, 4}, testutil.EventModRevs(buf))
// Partial range
buf, n, _ = b.Range(b.StartRange(2), defaultKeyRange)
buf, n, _ = b.Range(startRange(b, 2), defaultKeyRange)
assert.Equal(t, 2, n)
assert.Equal(t, []int64{3, 4}, testutil.EventModRevs(buf))
@ -62,7 +63,7 @@ func TestBufferOrdering(t *testing.T) {
// This gap is never filled - wait for the timeout
for {
buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange)
buf, n, _ = b.Range(startRange(b, 0), defaultKeyRange)
if n == 4 {
break
}
@ -72,10 +73,13 @@ func TestBufferOrdering(t *testing.T) {
// Push another event, which will cause the earliest event to fall off
b.Push(eventWithModRev(7))
buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange)
buf, n, _ = b.Range(startRange(b, 2), defaultKeyRange)
assert.Equal(t, 4, n)
assert.Equal(t, []int64{3, 4, 6, 7}, testutil.EventModRevs(buf))
// Prove we can't start a range at a revision that has been trimmed
assert.Nil(t, startRange(b, 0))
cancel()
<-done
}
@ -100,7 +104,7 @@ func TestBufferKeyFiltering(t *testing.T) {
Key: []byte("foo/4"),
}}})
slice, _, _ := b.Range(b.StartRange(0), keyRange("bar", "bar0"))
slice, _, _ := b.Range(startRange(b, 0), keyRange("bar", "bar0"))
require.Len(t, slice, 2)
assert.Equal(t, []int64{2, 3}, testutil.EventModRevs(slice))
}
@ -128,3 +132,8 @@ func keyRange(from, to string) adt.IntervalTree {
}
var defaultKeyRange = keyRange("foo", "foo0")
func startRange(b *buffer, rev int64) *list.Element {
el, _ := b.StartRange(rev)
return el
}

Просмотреть файл

@ -1,6 +1,7 @@
package watch
import (
"container/list"
"context"
"fmt"
"time"
@ -67,9 +68,28 @@ func (m *Mux) watchLoop(w clientv3.WatchChan) {
}
}
func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *etcdserverpb.WatchResponse) bool {
func (m *Mux) Watch(key, end []byte, rev int64) *Watch {
pos, ok := m.buffer.StartRange(rev)
if !ok {
return nil
}
// TODO: Consider one tree per incoming watch connection (like etcd does)
tree := adt.NewIntervalTree()
tree.Insert(adt.NewStringAffineInterval(string(key), string(end)), nil)
return &Watch{m: m, pos: pos, tree: tree}
}
type Watch struct {
m *Mux
pos *list.Element
tree adt.IntervalTree
}
func (w *Watch) Run(ctx context.Context, ch chan<- *etcdserverpb.WatchResponse) *Status {
broadcast := make(chan struct{}, 2)
close := m.bcast.Watch(broadcast)
close := w.m.bcast.Watch(broadcast)
go func() {
<-ctx.Done()
close()
@ -77,20 +97,15 @@ func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *
broadcast <- struct{}{}
// TODO: Consider one tree per incoming watch connection (like etcd does)
tree := adt.NewIntervalTree()
tree.Insert(adt.NewStringAffineInterval(string(key), string(end)), nil)
pos := m.buffer.StartRange(rev)
var n int
for range broadcast {
resp := &etcdserverpb.WatchResponse{Header: &etcdserverpb.ResponseHeader{}}
resp.Events, n, pos = m.buffer.Range(pos, tree)
resp.Events, n, w.pos = w.m.buffer.Range(w.pos, w.tree)
if n > 0 {
ch <- resp
}
}
return true
return nil
}
type Status struct {