From 2d8ca7222bcba5cd65e2190262bb7cefea0f3986 Mon Sep 17 00:00:00 2001 From: Jordan Olshevski Date: Mon, 18 Jul 2022 14:08:23 -0500 Subject: [PATCH] Reject watches that start too early --- internal/proxysvr/metrics.go | 7 +++++++ internal/proxysvr/server.go | 13 ++++++++++-- internal/watch/buffer.go | 38 +++++++++++++++++++++++------------ internal/watch/buffer_test.go | 23 ++++++++++++++------- internal/watch/mux.go | 33 +++++++++++++++++++++--------- 5 files changed, 83 insertions(+), 31 deletions(-) diff --git a/internal/proxysvr/metrics.go b/internal/proxysvr/metrics.go index 1eff49c..8090155 100644 --- a/internal/proxysvr/metrics.go +++ b/internal/proxysvr/metrics.go @@ -22,10 +22,17 @@ var ( Name: "metaetcd_active_watch_count", Help: "Number of active watch connections.", }) + + watchLateStartCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "metaetcd_watch_late_start_count", + Help: "Number of total watch requests with revisions that are too late.", + }) ) func init() { prometheus.MustRegister(requestCount) prometheus.MustRegister(getMemberRevDepth) prometheus.MustRegister(activeWatchCount) + prometheus.MustRegister(watchLateStartCount) } diff --git a/internal/proxysvr/server.go b/internal/proxysvr/server.go index f425361..8831670 100644 --- a/internal/proxysvr/server.go +++ b/internal/proxysvr/server.go @@ -186,9 +186,18 @@ func (s *server) Watch(srv etcdserverpb.Watch_WatchServer) error { return err } } + + watch := s.members.WatchMux.Watch(r.Key, r.RangeEnd, r.StartRevision) + if watch == nil { + zap.L().Warn("watch start revision is too early", zap.Int64("rev", r.StartRevision)) + watchLateStartCount.Inc() + ch <- &etcdserverpb.WatchResponse{WatchId: r.WatchId, Created: false, Header: &etcdserverpb.ResponseHeader{}} + return nil + } + wg.Go(func() error { - zap.L().Info("adding keyspace to watch connection", zap.String("watchID", id), zap.String("start", string(r.Key)), zap.String("end", string(r.RangeEnd)), zap.Int64("metaRev", r.StartRevision)) - s.members.WatchMux.Watch(srv.Context(), r.Key, r.RangeEnd, r.StartRevision, ch) + zap.L().Info("starting watch connection", zap.String("watchID", id), zap.String("start", string(r.Key)), zap.String("end", string(r.RangeEnd)), zap.Int64("metaRev", r.StartRevision)) + watch.Run(srv.Context(), ch) return nil }) ch <- &etcdserverpb.WatchResponse{WatchId: r.WatchId, Created: true, Header: &etcdserverpb.ResponseHeader{}} diff --git a/internal/watch/buffer.go b/internal/watch/buffer.go index 0410c3a..7575d27 100644 --- a/internal/watch/buffer.go +++ b/internal/watch/buffer.go @@ -12,22 +12,20 @@ import ( "go.uber.org/zap" ) -// TODO: Return error for attempts to start watch connection before the oldest message in the buffer - // TODO: Consider watching only the metakey instead of maintaining event buffer for entire keyspace type buffer struct { - mut sync.Mutex - list *list.List - gapTimeout time.Duration - maxLen int - upperBound int64 - bcast *broadcast - upperVal *list.Element + mut sync.Mutex + list *list.List + gapTimeout time.Duration + maxLen int + lowerBound, upperBound int64 + bcast *broadcast + upperVal *list.Element } func newBuffer(gapTimeout time.Duration, maxLen int, bcast *broadcast) *buffer { - return &buffer{list: list.New(), gapTimeout: gapTimeout, maxLen: maxLen, bcast: bcast} + return &buffer{list: list.New(), gapTimeout: gapTimeout, maxLen: maxLen, bcast: bcast, lowerBound: -1} } func (b *buffer) Run(ctx context.Context) { @@ -113,13 +111,24 @@ func (b *buffer) trimUnlocked() { if front == b.upperVal { return // don't trim events until the gap has been filled } + + next := front.Next() + if next != nil { + newFront := next.Value.(*eventWrapper) + b.lowerBound = newFront.Kv.ModRevision + } + b.list.Remove(front) } -func (b *buffer) StartRange(start int64) *list.Element { +func (b *buffer) StartRange(start int64) (*list.Element, bool) { b.mut.Lock() defer b.mut.Unlock() + if start < b.lowerBound { + return nil, false + } + val := b.list.Front() for i := 0; true; i++ { if val == nil { @@ -130,11 +139,11 @@ func (b *buffer) StartRange(start int64) *list.Element { break // buffer starts after the requested start rev } if e.Kv.ModRevision == start { - return val + return val, true } val = val.Next() } - return nil + return nil, true } func (b *buffer) Range(start *list.Element, ivl adt.IntervalTree) (slice []*mvccpb.Event, n int, pos *list.Element) { @@ -193,6 +202,9 @@ func (b *buffer) bridgeGapUnlocked() (ok, changed bool) { } b.upperBound = valE.Kv.ModRevision + if b.lowerBound == -1 { + b.lowerBound = valE.Kv.ModRevision + } currentWatchRev.Set(float64(b.upperBound)) watchLatency.Observe(age.Seconds()) diff --git a/internal/watch/buffer_test.go b/internal/watch/buffer_test.go index e084ad8..42cf29d 100644 --- a/internal/watch/buffer_test.go +++ b/internal/watch/buffer_test.go @@ -1,6 +1,7 @@ package watch import ( + "container/list" "context" "testing" "time" @@ -32,7 +33,7 @@ func TestBufferOrdering(t *testing.T) { // The first event starts at rev 2, wait for the initial gap b.Push(eventWithModRev(2)) - _, n, _ := b.Range(b.StartRange(0), defaultKeyRange) + _, n, _ := b.Range(startRange(b, 0), defaultKeyRange) assert.Equal(t, 0, n) <-ch @@ -40,7 +41,7 @@ func TestBufferOrdering(t *testing.T) { b.Push(eventWithModRev(4)) // Full range - but only the first should be returned since there is a gap - buf, n, _ := b.Range(b.StartRange(0), defaultKeyRange) + buf, n, _ := b.Range(startRange(b, 0), defaultKeyRange) assert.Equal(t, 1, n) assert.Equal(t, []int64{2}, testutil.EventModRevs(buf)) @@ -48,12 +49,12 @@ func TestBufferOrdering(t *testing.T) { b.Push(eventWithModRev(3)) // Full range - buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange) + buf, n, _ = b.Range(startRange(b, 0), defaultKeyRange) assert.Equal(t, 3, n) assert.Equal(t, []int64{2, 3, 4}, testutil.EventModRevs(buf)) // Partial range - buf, n, _ = b.Range(b.StartRange(2), defaultKeyRange) + buf, n, _ = b.Range(startRange(b, 2), defaultKeyRange) assert.Equal(t, 2, n) assert.Equal(t, []int64{3, 4}, testutil.EventModRevs(buf)) @@ -62,7 +63,7 @@ func TestBufferOrdering(t *testing.T) { // This gap is never filled - wait for the timeout for { - buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange) + buf, n, _ = b.Range(startRange(b, 0), defaultKeyRange) if n == 4 { break } @@ -72,10 +73,13 @@ func TestBufferOrdering(t *testing.T) { // Push another event, which will cause the earliest event to fall off b.Push(eventWithModRev(7)) - buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange) + buf, n, _ = b.Range(startRange(b, 2), defaultKeyRange) assert.Equal(t, 4, n) assert.Equal(t, []int64{3, 4, 6, 7}, testutil.EventModRevs(buf)) + // Prove we can't start a range at a revision that has been trimmed + assert.Nil(t, startRange(b, 0)) + cancel() <-done } @@ -100,7 +104,7 @@ func TestBufferKeyFiltering(t *testing.T) { Key: []byte("foo/4"), }}}) - slice, _, _ := b.Range(b.StartRange(0), keyRange("bar", "bar0")) + slice, _, _ := b.Range(startRange(b, 0), keyRange("bar", "bar0")) require.Len(t, slice, 2) assert.Equal(t, []int64{2, 3}, testutil.EventModRevs(slice)) } @@ -128,3 +132,8 @@ func keyRange(from, to string) adt.IntervalTree { } var defaultKeyRange = keyRange("foo", "foo0") + +func startRange(b *buffer, rev int64) *list.Element { + el, _ := b.StartRange(rev) + return el +} diff --git a/internal/watch/mux.go b/internal/watch/mux.go index 46bb7bf..bc06a46 100644 --- a/internal/watch/mux.go +++ b/internal/watch/mux.go @@ -1,6 +1,7 @@ package watch import ( + "container/list" "context" "fmt" "time" @@ -67,9 +68,28 @@ func (m *Mux) watchLoop(w clientv3.WatchChan) { } } -func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *etcdserverpb.WatchResponse) bool { +func (m *Mux) Watch(key, end []byte, rev int64) *Watch { + pos, ok := m.buffer.StartRange(rev) + if !ok { + return nil + } + + // TODO: Consider one tree per incoming watch connection (like etcd does) + tree := adt.NewIntervalTree() + tree.Insert(adt.NewStringAffineInterval(string(key), string(end)), nil) + + return &Watch{m: m, pos: pos, tree: tree} +} + +type Watch struct { + m *Mux + pos *list.Element + tree adt.IntervalTree +} + +func (w *Watch) Run(ctx context.Context, ch chan<- *etcdserverpb.WatchResponse) *Status { broadcast := make(chan struct{}, 2) - close := m.bcast.Watch(broadcast) + close := w.m.bcast.Watch(broadcast) go func() { <-ctx.Done() close() @@ -77,20 +97,15 @@ func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- * broadcast <- struct{}{} - // TODO: Consider one tree per incoming watch connection (like etcd does) - tree := adt.NewIntervalTree() - tree.Insert(adt.NewStringAffineInterval(string(key), string(end)), nil) - - pos := m.buffer.StartRange(rev) var n int for range broadcast { resp := &etcdserverpb.WatchResponse{Header: &etcdserverpb.ResponseHeader{}} - resp.Events, n, pos = m.buffer.Range(pos, tree) + resp.Events, n, w.pos = w.m.buffer.Range(w.pos, w.tree) if n > 0 { ch <- resp } } - return true + return nil } type Status struct {