This commit is contained in:
Jordan Olshevski 2022-07-13 12:02:54 -05:00
Родитель 1a6d4d2afa
Коммит aa55d083d4
6 изменённых файлов: 50 добавлений и 24 удалений

Просмотреть файл

@ -135,6 +135,12 @@ func (s *server) Watch(srv etcdserverpb.Watch_WatchServer) error {
return err
}
if r := msg.GetCreateRequest(); r != nil {
if r.StartRevision == 0 {
r.StartRevision, err = s.now(srv.Context())
if err != nil {
return err
}
}
wg.Go(func() error {
zap.L().Info("adding keyspace to watch connection", zap.String("watchID", id), zap.String("start", string(r.Key)), zap.String("end", string(r.RangeEnd)), zap.Int64("metaRev", r.StartRevision))
s.members.WatchMux.Watch(srv.Context(), r.Key, r.RangeEnd, r.StartRevision, ch)

Просмотреть файл

@ -58,7 +58,7 @@ func TestIntegrationBulk(t *testing.T) {
t.Run("watch from rev", func(t *testing.T) {
watch := client.Watch(watchCtx, "key-", clientv3.WithRange(clientv3.GetPrefixRangeEnd("key-")), clientv3.WithPrevKV(), clientv3.WithRev(lastSeenMetaRev-15))
collectEvents(t, watch, 17)
collectEvents(t, watch, 16)
})
cancel()

Просмотреть файл

@ -27,7 +27,6 @@ func (b *broadcast) Watch(ch chan<- struct{}) func() {
b.mut.Lock()
defer b.mut.Unlock()
b.chs[ch] = ch
// TODO: Close channel?
}()
return func() {

Просмотреть файл

@ -14,6 +14,8 @@ import (
// TODO: Return error for attempts to start watch connection before the oldest message in the buffer
// TODO: Consider watching only the metakey instead of maintaining event buffer for entire keyspace
type buffer struct {
mut sync.Mutex
list *list.List
@ -114,29 +116,51 @@ func (b *buffer) trimUnlocked() {
b.list.Remove(front)
}
func (b *buffer) Range(start int64, ivl adt.IntervalTree) (slice []*mvccpb.Event, n int, rev int64) {
func (b *buffer) StartRange(start int64) *list.Element {
b.mut.Lock()
defer b.mut.Unlock()
val := b.list.Front()
for i := 0; true; i++ {
if val == nil {
break
}
e := val.Value.(*eventWrapper)
if i == 0 && e.Kv.ModRevision > start {
break // buffer starts after the requested start rev
}
if e.Kv.ModRevision == start {
return val
}
val = val.Next()
}
return nil
}
// TODO: Somewhere, hold a pointer to current position instead of scanning for start rev every time
func (b *buffer) Range(start *list.Element, ivl adt.IntervalTree) (slice []*mvccpb.Event, n int, pos *list.Element) {
b.mut.Lock()
defer b.mut.Unlock()
var val *list.Element
if start == nil {
val = b.list.Front()
} else {
val = start.Next()
}
for {
if val == nil {
break
}
e := val.Value.(*eventWrapper)
if e.Kv.ModRevision > b.upperBound {
break
}
if e.Kv.ModRevision > start && ivl.Intersects(e.Key) {
if ivl.Intersects(e.Key) {
n++
slice = append(slice, e.Event)
rev = e.Kv.ModRevision
}
next := val.Next()
val = next
pos = val
val = val.Next()
}
return
}
@ -148,7 +172,7 @@ func (b *buffer) bridgeGapUnlocked() (ok, changed bool) {
val = b.list.Front()
}
for {
if val == nil {
if val == nil || val.Value == nil {
break
}
valE := val.Value.(*eventWrapper)

Просмотреть файл

@ -32,7 +32,7 @@ func TestBufferOrdering(t *testing.T) {
// The first event starts at rev 2, wait for the initial gap
b.Push(eventWithModRev(2))
_, n, _ := b.Range(0, defaultKeyRange)
_, n, _ := b.Range(b.StartRange(0), defaultKeyRange)
assert.Equal(t, 0, n)
<-ch
@ -40,24 +40,21 @@ func TestBufferOrdering(t *testing.T) {
b.Push(eventWithModRev(4))
// Full range - but only the first should be returned since there is a gap
buf, n, rev := b.Range(0, defaultKeyRange)
buf, n, _ := b.Range(b.StartRange(0), defaultKeyRange)
assert.Equal(t, 1, n)
assert.Equal(t, int64(2), rev)
assert.Equal(t, []int64{2}, testutil.EventModRevs(buf))
// Fill the gap
b.Push(eventWithModRev(3))
// Full range
buf, n, rev = b.Range(0, defaultKeyRange)
buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange)
assert.Equal(t, 3, n)
assert.Equal(t, int64(4), rev)
assert.Equal(t, []int64{2, 3, 4}, testutil.EventModRevs(buf))
// Partial range
buf, n, rev = b.Range(2, defaultKeyRange)
buf, n, _ = b.Range(b.StartRange(2), defaultKeyRange)
assert.Equal(t, 2, n)
assert.Equal(t, int64(4), rev)
assert.Equal(t, []int64{3, 4}, testutil.EventModRevs(buf))
// Push event to create another gap
@ -65,7 +62,7 @@ func TestBufferOrdering(t *testing.T) {
// This gap is never filled - wait for the timeout
for {
buf, n, _ = b.Range(0, defaultKeyRange)
buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange)
if n == 4 {
break
}
@ -75,7 +72,7 @@ func TestBufferOrdering(t *testing.T) {
// Push another event, which will cause the earliest event to fall off
b.Push(eventWithModRev(7))
buf, n, _ = b.Range(0, defaultKeyRange)
buf, n, _ = b.Range(b.StartRange(0), defaultKeyRange)
assert.Equal(t, 4, n)
assert.Equal(t, []int64{3, 4, 6, 7}, testutil.EventModRevs(buf))
@ -103,7 +100,7 @@ func TestBufferKeyFiltering(t *testing.T) {
Key: []byte("foo/4"),
}}})
slice, _, _ := b.Range(0, keyRange("bar", "bar0"))
slice, _, _ := b.Range(b.StartRange(0), keyRange("bar", "bar0"))
require.Len(t, slice, 2)
assert.Equal(t, []int64{2, 3}, testutil.EventModRevs(slice))
}

Просмотреть файл

@ -67,7 +67,7 @@ func (m *Mux) watchLoop(w clientv3.WatchChan) {
}
}
func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *etcdserverpb.WatchResponse) {
func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *etcdserverpb.WatchResponse) bool {
broadcast := make(chan struct{}, 2)
close := m.bcast.Watch(broadcast)
go func() {
@ -81,16 +81,16 @@ func (m *Mux) Watch(ctx context.Context, key, end []byte, rev int64, ch chan<- *
tree := adt.NewIntervalTree()
tree.Insert(adt.NewStringAffineInterval(string(key), string(end)), nil)
var startingRev int64
pos := m.buffer.StartRange(rev)
var n int
for range broadcast {
resp := &etcdserverpb.WatchResponse{Header: &etcdserverpb.ResponseHeader{}}
resp.Events, n, resp.Header.Revision = m.buffer.Range(startingRev, tree)
resp.Events, n, pos = m.buffer.Range(pos, tree)
if n > 0 {
ch <- resp
startingRev = resp.Header.Revision
}
}
return true
}
type Status struct {