diff --git a/internal/proxysvr/server.go b/internal/proxysvr/server.go index 3a7bf46..cc2f1b0 100644 --- a/internal/proxysvr/server.go +++ b/internal/proxysvr/server.go @@ -135,6 +135,12 @@ func (s *server) Watch(srv etcdserverpb.Watch_WatchServer) error { return err } if r := msg.GetCreateRequest(); r != nil { + if r.StartRevision == 0 { + r.StartRevision, err = s.now(srv.Context()) + if err != nil { + return err + } + } wg.Go(func() error { zap.L().Info("adding keyspace to watch connection", zap.String("watchID", id), zap.String("start", string(r.Key)), zap.String("end", string(r.RangeEnd)), zap.Int64("metaRev", r.StartRevision)) s.members.WatchMux.Watch(srv.Context(), r.Key, r.RangeEnd, r.StartRevision, ch) diff --git a/internal/proxysvr/server_test.go b/internal/proxysvr/server_test.go index e8025e6..3fd8b5c 100644 --- a/internal/proxysvr/server_test.go +++ b/internal/proxysvr/server_test.go @@ -58,7 +58,7 @@ func TestIntegrationBulk(t *testing.T) { t.Run("watch from rev", func(t *testing.T) { watch := client.Watch(watchCtx, "key-", clientv3.WithRange(clientv3.GetPrefixRangeEnd("key-")), clientv3.WithPrevKV(), clientv3.WithRev(lastSeenMetaRev-15)) - collectEvents(t, watch, 17) + collectEvents(t, watch, 16) }) cancel() diff --git a/internal/watch/broadcast.go b/internal/watch/broadcast.go index a63c488..0d3a5d0 100644 --- a/internal/watch/broadcast.go +++ b/internal/watch/broadcast.go @@ -27,7 +27,6 @@ func (b *broadcast) Watch(ch chan<- struct{}) func() { b.mut.Lock() defer b.mut.Unlock() b.chs[ch] = ch - // TODO: Close channel? }() return func() { diff --git a/internal/watch/buffer.go b/internal/watch/buffer.go index 4c65529..f46b7e0 100644 --- a/internal/watch/buffer.go +++ b/internal/watch/buffer.go @@ -14,6 +14,8 @@ import ( // TODO: Return error for attempts to start watch connection before the oldest message in the buffer +// TODO: Consider watching only the metakey instead of maintaining event buffer for entire keyspace + type buffer struct { mut sync.Mutex list *list.List @@ -114,29 +116,51 @@ func (b *buffer) trimUnlocked() { b.list.Remove(front) } -func (b *buffer) Range(start int64, ivl adt.IntervalTree) (slice []*mvccpb.Event, n int, rev int64) { +func (b *buffer) StartRange(start int64) *list.Element { b.mut.Lock() defer b.mut.Unlock() + val := b.list.Front() + for i := 0; true; i++ { + if val == nil { + break + } + e := val.Value.(*eventWrapper) + if i == 0 && e.Kv.ModRevision > start { + break // buffer starts after the requested start rev + } + if e.Kv.ModRevision == start { + return val + } + val = val.Next() + } + return nil +} - // TODO: Somewhere, hold a pointer to current position instead of scanning for start rev every time +func (b *buffer) Range(start *list.Element, ivl adt.IntervalTree) (slice []*mvccpb.Event, n int, pos *list.Element) { + b.mut.Lock() + defer b.mut.Unlock() + var val *list.Element + if start == nil { + val = b.list.Front() + } else { + val = start.Next() + } for { if val == nil { break } - e := val.Value.(*eventWrapper) if e.Kv.ModRevision > b.upperBound { break } - if e.Kv.ModRevision > start && ivl.Intersects(e.Key) { + if ivl.Intersects(e.Key) { n++ slice = append(slice, e.Event) - rev = e.Kv.ModRevision } - next := val.Next() - val = next + pos = val + val = val.Next() } return } @@ -148,7 +172,7 @@ func (b *buffer) bridgeGapUnlocked() (ok, changed bool) { val = b.list.Front() } for { - if val == nil { + if val == nil || val.Value == nil { break } valE := val.Value.(*eventWrapper) diff --git a/internal/watch/buffer_test.go b/internal/watch/buffer_test.go index d485342..e084ad8 100644 --- a/internal/watch/buffer_test.go +++ b/internal/watch/buffer_test.go @@ -32,7 +32,7 @@ func TestBufferOrdering(t *testing.T) { // The first event starts at rev 2, wait for the initial gap b.Push(eventWithModRev(2)) - _, n, _ := b.Range(0, defaultKeyRange) + _, n, _ := b.Range(b.StartRange(0), defaultKeyRange) assert.Equal(t, 0, n) <-ch @@ -40,24 +40,21 @@ func TestBufferOrdering(t *testing.T) { b.Push(eventWithModRev(4)) // Full range - but only the first should be returned since there is a gap - buf, n, rev := b.Range(0, defaultKeyRange) + buf, n, _ := b.Range(b.StartRange(0), defaultKeyRange) assert.Equal(t, 1, n) - assert.Equal(t, int64(2), rev) assert.Equal(t, []int64{2}, testutil.EventModRevs(buf)) // Fill the gap b.Push(eventWithModRev(3)) // Full range - buf, n, rev = b.Range(0, defaultKeyRange) + buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange) assert.Equal(t, 3, n) - assert.Equal(t, int64(4), rev) assert.Equal(t, []int64{2, 3, 4}, testutil.EventModRevs(buf)) // Partial range - buf, n, rev = b.Range(2, defaultKeyRange) + buf, n, _ = b.Range(b.StartRange(2), defaultKeyRange) assert.Equal(t, 2, n) - assert.Equal(t, int64(4), rev) assert.Equal(t, []int64{3, 4}, testutil.EventModRevs(buf)) // Push event to create another gap @@ -65,7 +62,7 @@ func TestBufferOrdering(t *testing.T) { // This gap is never filled - wait for the timeout for { - buf, n, _ = b.Range(0, defaultKeyRange) + buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange) if n == 4 { break } @@ -75,7 +72,7 @@ func TestBufferOrdering(t *testing.T) { // Push another event, which will cause the earliest event to fall off b.Push(eventWithModRev(7)) - buf, n, _ = b.Range(0, defaultKeyRange) + buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange) assert.Equal(t, 4, n) assert.Equal(t, []int64{3, 4, 6, 7}, testutil.EventModRevs(buf)) @@ -103,7 +100,7 @@ func TestBufferKeyFiltering(t *testing.T) { Key: []byte("foo/4"), }}}) - slice, _, _ := b.Range(0, keyRange("bar", "bar0")) + slice, _, _ := b.Range(b.StartRange(0), keyRange("bar", "bar0")) require.Len(t, slice, 2) assert.Equal(t, []int64{2, 3}, testutil.EventModRevs(slice)) } diff --git a/internal/watch/mux.go b/internal/watch/mux.go index ee40993..46bb7bf 100644 --- a/internal/watch/mux.go +++ b/internal/watch/mux.go @@ -67,7 +67,7 @@ func (m *Mux) watchLoop(w clientv3.WatchChan) { } } -func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *etcdserverpb.WatchResponse) { +func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *etcdserverpb.WatchResponse) bool { broadcast := make(chan struct{}, 2) close := m.bcast.Watch(broadcast) go func() { @@ -81,16 +81,16 @@ func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- * tree := adt.NewIntervalTree() tree.Insert(adt.NewStringAffineInterval(string(key), string(end)), nil) - var startingRev int64 + pos := m.buffer.StartRange(rev) var n int for range broadcast { resp := &etcdserverpb.WatchResponse{Header: &etcdserverpb.ResponseHeader{}} - resp.Events, n, resp.Header.Revision = m.buffer.Range(startingRev, tree) + resp.Events, n, pos = m.buffer.Range(pos, tree) if n > 0 { ch <- resp - startingRev = resp.Header.Revision } } + return true } type Status struct {