зеркало из https://github.com/golang/leveldb.git
452 строки
12 KiB
Go
452 строки
12 KiB
Go
// Copyright 2013 The LevelDB-Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/golang/leveldb/db"
|
|
"github.com/golang/leveldb/table"
|
|
)
|
|
|
|
const (
|
|
targetFileSize = 2 * 1024 * 1024
|
|
|
|
// maxGrandparentOverlapBytes is the maximum bytes of overlap with
|
|
// level+2 before we stop building a single file in a level to level+1
|
|
// compaction.
|
|
maxGrandparentOverlapBytes = 10 * targetFileSize
|
|
|
|
// expandedCompactionByteSizeLimit is the maximum number of bytes in
|
|
// all compacted files. We avoid expanding the lower level file set of
|
|
// a compaction if it would make the total compaction cover more than
|
|
// this many bytes.
|
|
expandedCompactionByteSizeLimit = 25 * targetFileSize
|
|
)
|
|
|
|
// compaction is a table compaction from one level to the next, starting from a
|
|
// given version.
|
|
type compaction struct {
|
|
version *version
|
|
|
|
// level is the level that is being compacted. Inputs from level and
|
|
// level+1 will be merged to produce a set of level+1 files.
|
|
level int
|
|
|
|
// inputs are the tables to be compacted.
|
|
inputs [3][]fileMetadata
|
|
}
|
|
|
|
// pickCompaction picks the best compaction, if any, for vs' current version.
|
|
func pickCompaction(vs *versionSet) (c *compaction) {
|
|
cur := vs.currentVersion()
|
|
|
|
// Pick a compaction based on size. If none exist, pick one based on seeks.
|
|
if cur.compactionScore >= 1 {
|
|
c = &compaction{
|
|
version: cur,
|
|
level: cur.compactionLevel,
|
|
}
|
|
// TODO: Pick the first file that comes after the compaction pointer for c.level.
|
|
c.inputs[0] = []fileMetadata{cur.files[c.level][0]}
|
|
|
|
} else if false {
|
|
// TODO: look for a compaction triggered by seeks.
|
|
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
// Files in level 0 may overlap each other, so pick up all overlapping ones.
|
|
if c.level == 0 {
|
|
smallest, largest := ikeyRange(vs.icmp, c.inputs[0], nil)
|
|
c.inputs[0] = cur.overlaps(0, vs.ucmp, smallest.ukey(), largest.ukey())
|
|
if len(c.inputs) == 0 {
|
|
panic("leveldb: empty compaction")
|
|
}
|
|
}
|
|
|
|
c.setupOtherInputs(vs)
|
|
return c
|
|
}
|
|
|
|
// TODO: user initiated compactions.
|
|
|
|
// setupOtherInputs fills in the rest of the compaction inputs, regardless of
|
|
// whether the compaction was automatically scheduled or user initiated.
|
|
func (c *compaction) setupOtherInputs(vs *versionSet) {
|
|
smallest0, largest0 := ikeyRange(vs.icmp, c.inputs[0], nil)
|
|
c.inputs[1] = c.version.overlaps(c.level+1, vs.ucmp, smallest0.ukey(), largest0.ukey())
|
|
smallest01, largest01 := ikeyRange(vs.icmp, c.inputs[0], c.inputs[1])
|
|
|
|
// Grow the inputs if it doesn't affect the number of level+1 files.
|
|
if c.grow(vs, smallest01, largest01) {
|
|
smallest01, largest01 = ikeyRange(vs.icmp, c.inputs[0], c.inputs[1])
|
|
}
|
|
|
|
// Compute the set of level+2 files that overlap this compaction.
|
|
if c.level+2 < numLevels {
|
|
c.inputs[2] = c.version.overlaps(c.level+2, vs.ucmp, smallest01.ukey(), largest01.ukey())
|
|
}
|
|
|
|
// TODO: update the compaction pointer for c.level.
|
|
}
|
|
|
|
// grow grows the number of inputs at c.level without changing the number of
|
|
// c.level+1 files in the compaction, and returns whether the inputs grew. sm
|
|
// and la are the smallest and largest internalKeys in all of the inputs.
|
|
func (c *compaction) grow(vs *versionSet, sm, la internalKey) bool {
|
|
if len(c.inputs[1]) == 0 {
|
|
return false
|
|
}
|
|
grow0 := c.version.overlaps(c.level, vs.ucmp, sm.ukey(), la.ukey())
|
|
if len(grow0) <= len(c.inputs[0]) {
|
|
return false
|
|
}
|
|
if totalSize(grow0)+totalSize(c.inputs[1]) >= expandedCompactionByteSizeLimit {
|
|
return false
|
|
}
|
|
sm1, la1 := ikeyRange(vs.icmp, grow0, nil)
|
|
grow1 := c.version.overlaps(c.level+1, vs.ucmp, sm1, la1)
|
|
if len(grow1) != len(c.inputs[1]) {
|
|
return false
|
|
}
|
|
c.inputs[0] = grow0
|
|
c.inputs[1] = grow1
|
|
return true
|
|
}
|
|
|
|
// isBaseLevelForUkey reports whether it is guaranteed that there are no
|
|
// key/value pairs at c.level+2 or higher that have the user key ukey.
|
|
func (c *compaction) isBaseLevelForUkey(userCmp db.Comparer, ukey []byte) bool {
|
|
// TODO: this can be faster if ukey is always increasing between successive
|
|
// isBaseLevelForUkey calls and we can keep some state in between calls.
|
|
for level := c.level + 2; level < numLevels; level++ {
|
|
for _, f := range c.version.files[level] {
|
|
if userCmp.Compare(ukey, f.largest.ukey()) <= 0 {
|
|
if userCmp.Compare(ukey, f.smallest.ukey()) >= 0 {
|
|
return false
|
|
}
|
|
// For levels above level 0, the files within a level are in
|
|
// increasing ikey order, so we can break early.
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// maybeScheduleCompaction schedules a compaction if necessary.
|
|
//
|
|
// d.mu must be held when calling this.
|
|
func (d *DB) maybeScheduleCompaction() {
|
|
if d.compacting || d.closed {
|
|
return
|
|
}
|
|
// TODO: check for manual compactions.
|
|
if d.imm == nil {
|
|
v := d.versions.currentVersion()
|
|
// TODO: check v.fileToCompact.
|
|
if v.compactionScore < 1 {
|
|
// There is no work to be done.
|
|
return
|
|
}
|
|
}
|
|
d.compacting = true
|
|
go d.compact()
|
|
}
|
|
|
|
// compact runs one compaction and maybe schedules another call to compact.
|
|
func (d *DB) compact() {
|
|
d.mu.Lock()
|
|
defer d.mu.Unlock()
|
|
if err := d.compact1(); err != nil {
|
|
// TODO: count consecutive compaction errors and backoff.
|
|
}
|
|
d.compacting = false
|
|
// The previous compaction may have produced too many files in a
|
|
// level, so reschedule another compaction if needed.
|
|
d.maybeScheduleCompaction()
|
|
d.compactionCond.Broadcast()
|
|
}
|
|
|
|
// compact1 runs one compaction.
|
|
//
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
// re-acquired during the course of this method.
|
|
func (d *DB) compact1() error {
|
|
if d.imm != nil {
|
|
return d.compactMemTable()
|
|
}
|
|
|
|
// TODO: support manual compactions.
|
|
|
|
c := pickCompaction(&d.versions)
|
|
if c == nil {
|
|
return nil
|
|
}
|
|
|
|
// Check for a trivial move of one table from one level to the next.
|
|
// We avoid such a move if there is lots of overlapping grandparent data.
|
|
// Otherwise, the move could create a parent file that will require
|
|
// a very expensive merge later on.
|
|
if len(c.inputs[0]) == 1 && len(c.inputs[1]) == 0 &&
|
|
totalSize(c.inputs[2]) <= maxGrandparentOverlapBytes {
|
|
|
|
meta := &c.inputs[0][0]
|
|
return d.versions.logAndApply(d.dirname, &versionEdit{
|
|
deletedFiles: map[deletedFileEntry]bool{
|
|
deletedFileEntry{level: c.level, fileNum: meta.fileNum}: true,
|
|
},
|
|
newFiles: []newFileEntry{
|
|
{level: c.level + 1, meta: *meta},
|
|
},
|
|
})
|
|
}
|
|
|
|
ve, pendingOutputs, err := d.compactDiskTables(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.versions.logAndApply(d.dirname, ve)
|
|
for _, fileNum := range pendingOutputs {
|
|
delete(d.pendingOutputs, fileNum)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.deleteObsoleteFiles()
|
|
return nil
|
|
}
|
|
|
|
// compactMemTable runs a compaction that copies d.imm from memory to disk.
|
|
//
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
// re-acquired during the course of this method.
|
|
func (d *DB) compactMemTable() error {
|
|
meta, err := d.writeLevel0Table(d.opts.GetFileSystem(), d.imm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = d.versions.logAndApply(d.dirname, &versionEdit{
|
|
logNumber: d.logNumber,
|
|
newFiles: []newFileEntry{
|
|
{level: 0, meta: meta},
|
|
},
|
|
})
|
|
delete(d.pendingOutputs, meta.fileNum)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
d.imm = nil
|
|
d.deleteObsoleteFiles()
|
|
return nil
|
|
}
|
|
|
|
// compactDiskTables runs a compaction that produces new on-disk tables from
|
|
// old on-disk tables.
|
|
//
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
// re-acquired during the course of this method.
|
|
func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, pendingOutputs []uint64, retErr error) {
|
|
defer func() {
|
|
if retErr != nil {
|
|
for _, fileNum := range pendingOutputs {
|
|
delete(d.pendingOutputs, fileNum)
|
|
}
|
|
pendingOutputs = nil
|
|
}
|
|
}()
|
|
|
|
// TODO: track snapshots.
|
|
smallestSnapshot := d.versions.lastSequence
|
|
|
|
// Release the d.mu lock while doing I/O.
|
|
// Note the unusual order: Unlock and then Lock.
|
|
d.mu.Unlock()
|
|
defer d.mu.Lock()
|
|
|
|
iter, err := compactionIterator(&d.tableCache, d.icmp, c)
|
|
if err != nil {
|
|
return nil, pendingOutputs, err
|
|
}
|
|
|
|
// TODO: output to more than one table, if it would otherwise be too large.
|
|
var (
|
|
fileNum uint64
|
|
filename string
|
|
tw *table.Writer
|
|
)
|
|
defer func() {
|
|
if iter != nil {
|
|
retErr = firstError(retErr, iter.Close())
|
|
}
|
|
if tw != nil {
|
|
retErr = firstError(retErr, tw.Close())
|
|
}
|
|
if retErr != nil {
|
|
d.opts.GetFileSystem().Remove(filename)
|
|
}
|
|
}()
|
|
|
|
currentUkey := make([]byte, 0, 4096)
|
|
hasCurrentUkey := false
|
|
lastSeqNumForKey := internalKeySeqNumMax
|
|
smallest, largest := internalKey(nil), internalKey(nil)
|
|
for iter.Next() {
|
|
// TODO: prioritize compacting d.imm.
|
|
|
|
// TODO: support c.shouldStopBefore.
|
|
|
|
ikey := internalKey(iter.Key())
|
|
if !ikey.valid() {
|
|
// Do not hide invalid keys.
|
|
currentUkey = currentUkey[:0]
|
|
hasCurrentUkey = false
|
|
lastSeqNumForKey = internalKeySeqNumMax
|
|
|
|
} else {
|
|
ukey := ikey.ukey()
|
|
if !hasCurrentUkey || d.icmp.userCmp.Compare(currentUkey, ukey) != 0 {
|
|
// This is the first occurrence of this user key.
|
|
currentUkey = append(currentUkey[:0], ukey...)
|
|
hasCurrentUkey = true
|
|
lastSeqNumForKey = internalKeySeqNumMax
|
|
}
|
|
|
|
drop, ikeySeqNum := false, ikey.seqNum()
|
|
if lastSeqNumForKey <= smallestSnapshot {
|
|
drop = true // Rule (A) referenced below.
|
|
|
|
} else if ikey.kind() == internalKeyKindDelete &&
|
|
ikeySeqNum <= smallestSnapshot &&
|
|
c.isBaseLevelForUkey(d.icmp.userCmp, ukey) {
|
|
|
|
// For this user key:
|
|
// (1) there is no data in higher levels
|
|
// (2) data in lower levels will have larger sequence numbers
|
|
// (3) data in layers that are being compacted here and have
|
|
// smaller sequence numbers will be dropped in the next
|
|
// few iterations of this loop (by rule (A) above).
|
|
// Therefore this deletion marker is obsolete and can be dropped.
|
|
drop = true
|
|
}
|
|
|
|
lastSeqNumForKey = ikeySeqNum
|
|
if drop {
|
|
continue
|
|
}
|
|
}
|
|
|
|
if tw == nil {
|
|
d.mu.Lock()
|
|
fileNum = d.versions.nextFileNum()
|
|
d.pendingOutputs[fileNum] = struct{}{}
|
|
pendingOutputs = append(pendingOutputs, fileNum)
|
|
d.mu.Unlock()
|
|
|
|
filename = dbFilename(d.dirname, fileTypeTable, fileNum)
|
|
file, err := d.opts.GetFileSystem().Create(filename)
|
|
if err != nil {
|
|
return nil, pendingOutputs, err
|
|
}
|
|
tw = table.NewWriter(file, &d.icmpOpts)
|
|
|
|
smallest = make(internalKey, len(ikey))
|
|
copy(smallest, ikey)
|
|
largest = make(internalKey, 0, 2*len(ikey))
|
|
}
|
|
largest = append(largest[:0], ikey...)
|
|
if err := tw.Set(ikey, iter.Value(), nil); err != nil {
|
|
return nil, pendingOutputs, err
|
|
}
|
|
}
|
|
|
|
ve = &versionEdit{
|
|
deletedFiles: map[deletedFileEntry]bool{},
|
|
newFiles: []newFileEntry{
|
|
{
|
|
level: c.level + 1,
|
|
meta: fileMetadata{
|
|
fileNum: fileNum,
|
|
size: 1,
|
|
smallest: smallest,
|
|
largest: largest,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
for i := 0; i < 2; i++ {
|
|
for _, f := range c.inputs[i] {
|
|
ve.deletedFiles[deletedFileEntry{
|
|
level: c.level + i,
|
|
fileNum: f.fileNum,
|
|
}] = true
|
|
}
|
|
}
|
|
return ve, pendingOutputs, nil
|
|
}
|
|
|
|
// compactionIterator returns an iterator over all the tables in a compaction.
|
|
func compactionIterator(tc *tableCache, icmp db.Comparer, c *compaction) (cIter db.Iterator, retErr error) {
|
|
iters := make([]db.Iterator, 0, len(c.inputs[0])+1)
|
|
defer func() {
|
|
if retErr != nil {
|
|
for _, iter := range iters {
|
|
if iter != nil {
|
|
iter.Close()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
if c.level != 0 {
|
|
iter, err := newConcatenatingIterator(tc, c.inputs[0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iters = append(iters, iter)
|
|
} else {
|
|
for _, f := range c.inputs[0] {
|
|
iter, err := tc.find(f.fileNum, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("leveldb: could not open table %d: %v", f.fileNum, err)
|
|
}
|
|
iters = append(iters, iter)
|
|
}
|
|
}
|
|
|
|
iter, err := newConcatenatingIterator(tc, c.inputs[1])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
iters = append(iters, iter)
|
|
return db.NewMergingIterator(icmp, iters...), nil
|
|
}
|
|
|
|
// newConcatenatingIterator returns a concatenating iterator over all of the
|
|
// input tables.
|
|
func newConcatenatingIterator(tc *tableCache, inputs []fileMetadata) (cIter db.Iterator, retErr error) {
|
|
iters := make([]db.Iterator, len(inputs))
|
|
defer func() {
|
|
if retErr != nil {
|
|
for _, iter := range iters {
|
|
if iter != nil {
|
|
iter.Close()
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i, f := range inputs {
|
|
iter, err := tc.find(f.fileNum, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("leveldb: could not open table %d: %v", f.fileNum, err)
|
|
}
|
|
iters[i] = iter
|
|
}
|
|
return db.NewConcatenatingIterator(iters...), nil
|
|
}
|