This commit is contained in:
Jordan Olshevski 2022-07-19 12:13:03 -05:00
Родитель e12d1b0fd9
Коммит 32a21196bb
2 изменённых файлов: 30 добавлений и 8 удалений

Просмотреть файл

@ -58,7 +58,6 @@ func (b *buffer) Push(events []*clientv3.Event) {
func (b *buffer) pushOrDeferUnlocked(event *mvccpb.Event) bool {
b.pushUnlocked(event)
ok, _ := b.bridgeGapUnlocked()
b.trimUnlocked()
if ok {
b.bcast.Send()
}
@ -106,14 +105,19 @@ func (b *buffer) pushUnlocked(event *mvccpb.Event) {
}
func (b *buffer) trimUnlocked() {
if b.list.Len() <= b.maxLen {
return
item := b.list.Front()
for {
if b.list.Len() <= b.maxLen || item == nil {
return
}
event := item.Value.(*eventWrapper)
next := item.Next()
if b.upperBound > event.Kv.ModRevision {
b.list.Remove(item)
}
item = next
}
front := b.list.Front()
if front == b.upperVal {
return // don't trim events until the gap has been filled
}
b.list.Remove(front)
}
func (b *buffer) StartRange(start int64) *list.Element {
@ -202,6 +206,7 @@ func (b *buffer) bridgeGapUnlocked() (ok, changed bool) {
continue
}
b.trimUnlocked()
return ok, changed
}

Просмотреть файл

@ -117,6 +117,23 @@ func TestBufferBridgeGap(t *testing.T) {
}
}
func TestBufferTrimWhenGap(t *testing.T) {
bcast := newBroadcast()
b := newBuffer(time.Millisecond, 2, bcast)
// Fill the buffer and more
const n = 10
for i := 0; i < n; i++ {
b.Push(eventWithModRev(int64(i + 3)))
}
assert.Equal(t, n, b.list.Len())
// Bridge the gap and prove the buffer was shortened
time.Sleep(time.Millisecond * 2)
b.bridgeGapUnlocked()
assert.Equal(t, 2, b.list.Len())
}
func eventWithModRev(rev int64) []*clientv3.Event {
return []*clientv3.Event{{Kv: &mvccpb.KeyValue{Key: []byte("foo/test"), ModRevision: rev}}}
}