зеркало из https://github.com/golang/leveldb.git
leveldb: implement compactions.
R=bradfitz CC=golang-dev https://codereview.appspot.com/15580052
This commit is contained in:
Родитель
5073d6066a
Коммит
c48121c1ef
|
@ -5,12 +5,20 @@
|
|||
package leveldb
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"code.google.com/p/leveldb-go/leveldb/db"
|
||||
"code.google.com/p/leveldb-go/leveldb/table"
|
||||
)
|
||||
|
||||
const (
|
||||
targetFileSize = 2 * 1024 * 1024
|
||||
|
||||
// maxGrandparentOverlapBytes is the maximum bytes of overlap with
|
||||
// level+2 before we stop building a single file in a level to level+1
|
||||
// compaction.
|
||||
maxGrandparentOverlapBytes = 10 * targetFileSize
|
||||
|
||||
// expandedCompactionByteSizeLimit is the maximum number of bytes in
|
||||
// all compacted files. We avoid expanding the lower level file set of
|
||||
// a compaction if it would make the total compaction cover more than
|
||||
|
@ -129,3 +137,299 @@ func (c *compaction) isBaseLevelForUkey(userCmp db.Comparer, ukey []byte) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// maybeScheduleCompaction schedules a compaction if necessary.
|
||||
//
|
||||
// d.mu must be held when calling this.
|
||||
func (d *DB) maybeScheduleCompaction() {
|
||||
if d.compacting {
|
||||
return
|
||||
}
|
||||
// TODO: check if db is shutting down.
|
||||
// TODO: check for manual compactions.
|
||||
if d.imm == nil {
|
||||
v := d.versions.currentVersion()
|
||||
// TODO: check v.fileToCompact.
|
||||
if v.compactionScore < 1 {
|
||||
// There is no work to be done.
|
||||
return
|
||||
}
|
||||
}
|
||||
d.compacting = true
|
||||
go d.compact()
|
||||
}
|
||||
|
||||
// compact runs one compaction and maybe schedules another call to compact.
|
||||
func (d *DB) compact() {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
if err := d.compact1(); err != nil {
|
||||
// TODO: count consecutive compaction errors and backoff.
|
||||
}
|
||||
d.compacting = false
|
||||
// The previous compaction may have produced too many files in a
|
||||
// level, so reschedule another compaction if needed.
|
||||
d.maybeScheduleCompaction()
|
||||
d.compactionCond.Broadcast()
|
||||
}
|
||||
|
||||
// compact1 runs one compaction.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) compact1() error {
|
||||
if d.imm != nil {
|
||||
return d.compactMemTable()
|
||||
}
|
||||
|
||||
// TODO: support manual compactions.
|
||||
|
||||
c := pickCompaction(&d.versions)
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check for a trivial move of one table from one level to the next.
|
||||
// We avoid such a move if there is lots of overlapping grandparent data.
|
||||
// Otherwise, the move could create a parent file that will require
|
||||
// a very expensive merge later on.
|
||||
if len(c.inputs[0]) == 1 && len(c.inputs[1]) == 0 &&
|
||||
totalSize(c.inputs[2]) <= maxGrandparentOverlapBytes {
|
||||
|
||||
meta := &c.inputs[0][0]
|
||||
return d.versions.logAndApply(d.dirname, &versionEdit{
|
||||
deletedFiles: map[deletedFileEntry]bool{
|
||||
deletedFileEntry{level: c.level, fileNum: meta.fileNum}: true,
|
||||
},
|
||||
newFiles: []newFileEntry{
|
||||
{level: c.level + 1, meta: *meta},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
ve, err := d.compactDiskTables(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := d.versions.logAndApply(d.dirname, ve); err != nil {
|
||||
return err
|
||||
}
|
||||
return d.deleteObsoleteFiles()
|
||||
}
|
||||
|
||||
// compactMemTable runs a compaction that copies d.imm from memory to disk.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) compactMemTable() error {
|
||||
meta, err := d.writeLevel0Table(d.opts.GetFileSystem(), d.imm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = d.versions.logAndApply(d.dirname, &versionEdit{
|
||||
logNumber: d.logNumber,
|
||||
newFiles: []newFileEntry{
|
||||
{level: 0, meta: meta},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.imm = nil
|
||||
return d.deleteObsoleteFiles()
|
||||
}
|
||||
|
||||
// compactDiskTables runs a compaction that produces new on-disk tables from
|
||||
// old on-disk tables.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, retErr error) {
|
||||
// TODO: track snapshots.
|
||||
smallestSnapshot := d.versions.lastSequence
|
||||
|
||||
// Release the d.mu lock while doing I/O.
|
||||
// Note the unusual order: Unlock and then Lock.
|
||||
d.mu.Unlock()
|
||||
defer d.mu.Lock()
|
||||
|
||||
iter, err := compactionIterator(&d.tableCache, d.icmp, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: output to more than one table, if it would otherwise be too large.
|
||||
var (
|
||||
fileNum uint64
|
||||
filename string
|
||||
tw *table.Writer
|
||||
)
|
||||
defer func() {
|
||||
if iter != nil {
|
||||
retErr = firstError(retErr, iter.Close())
|
||||
}
|
||||
if tw != nil {
|
||||
retErr = firstError(retErr, tw.Close())
|
||||
}
|
||||
if retErr != nil {
|
||||
d.opts.GetFileSystem().Remove(filename)
|
||||
}
|
||||
}()
|
||||
|
||||
currentUkey := make([]byte, 0, 4096)
|
||||
hasCurrentUkey := false
|
||||
lastSeqNumForKey := internalKeySeqNumMax
|
||||
smallest, largest := internalKey(nil), internalKey(nil)
|
||||
for iter.Next() {
|
||||
// TODO: prioritize compacting d.imm.
|
||||
|
||||
// TODO: support c.shouldStopBefore.
|
||||
|
||||
ikey := internalKey(iter.Key())
|
||||
if !ikey.valid() {
|
||||
// Do not hide invalid keys.
|
||||
currentUkey = currentUkey[:0]
|
||||
hasCurrentUkey = false
|
||||
lastSeqNumForKey = internalKeySeqNumMax
|
||||
|
||||
} else {
|
||||
ukey := ikey.ukey()
|
||||
if !hasCurrentUkey || d.icmp.userCmp.Compare(currentUkey, ukey) != 0 {
|
||||
// This is the first occurrence of this user key.
|
||||
currentUkey = append(currentUkey[:0], ukey...)
|
||||
hasCurrentUkey = true
|
||||
lastSeqNumForKey = internalKeySeqNumMax
|
||||
}
|
||||
|
||||
drop, ikeySeqNum := false, ikey.seqNum()
|
||||
if lastSeqNumForKey <= smallestSnapshot {
|
||||
drop = true // Rule (A) referenced below.
|
||||
|
||||
} else if ikey.kind() == internalKeyKindDelete &&
|
||||
ikeySeqNum <= smallestSnapshot &&
|
||||
c.isBaseLevelForUkey(d.icmp.userCmp, ukey) {
|
||||
|
||||
// For this user key:
|
||||
// (1) there is no data in higher levels
|
||||
// (2) data in lower levels will have larger sequence numbers
|
||||
// (3) data in layers that are being compacted here and have
|
||||
// smaller sequence numbers will be dropped in the next
|
||||
// few iterations of this loop (by rule (A) above).
|
||||
// Therefore this deletion marker is obsolete and can be dropped.
|
||||
drop = true
|
||||
}
|
||||
|
||||
lastSeqNumForKey = ikeySeqNum
|
||||
if drop {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if tw == nil {
|
||||
d.mu.Lock()
|
||||
fileNum = d.versions.nextFileNum()
|
||||
// TODO: track pending outputs.
|
||||
d.mu.Unlock()
|
||||
|
||||
filename = dbFilename(d.dirname, fileTypeTable, fileNum)
|
||||
file, err := d.opts.GetFileSystem().Create(filename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tw = table.NewWriter(file, &d.icmpOpts)
|
||||
|
||||
smallest = make(internalKey, len(ikey))
|
||||
copy(smallest, ikey)
|
||||
largest = make(internalKey, 0, 2*len(ikey))
|
||||
}
|
||||
largest = append(largest[:0], ikey...)
|
||||
if err := tw.Set(ikey, iter.Value(), nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
ve = &versionEdit{
|
||||
deletedFiles: map[deletedFileEntry]bool{},
|
||||
newFiles: []newFileEntry{
|
||||
{
|
||||
level: c.level + 1,
|
||||
meta: fileMetadata{
|
||||
fileNum: fileNum,
|
||||
size: 1,
|
||||
smallest: smallest,
|
||||
largest: largest,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
for _, f := range c.inputs[i] {
|
||||
ve.deletedFiles[deletedFileEntry{
|
||||
level: c.level + i,
|
||||
fileNum: f.fileNum,
|
||||
}] = true
|
||||
}
|
||||
}
|
||||
return ve, nil
|
||||
}
|
||||
|
||||
// compactionIterator returns an iterator over all the tables in a compaction.
|
||||
func compactionIterator(tc *tableCache, icmp db.Comparer, c *compaction) (cIter db.Iterator, retErr error) {
|
||||
iters := make([]db.Iterator, 0, len(c.inputs[0])+1)
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
for _, iter := range iters {
|
||||
if iter != nil {
|
||||
iter.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if c.level != 0 {
|
||||
iter, err := newConcatenatingIterator(tc, c.inputs[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iters = append(iters, iter)
|
||||
} else {
|
||||
for _, f := range c.inputs[0] {
|
||||
iter, err := tc.find(f.fileNum, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("leveldb: could not open table %d: %v", f.fileNum, err)
|
||||
}
|
||||
iters = append(iters, iter)
|
||||
}
|
||||
}
|
||||
|
||||
iter, err := newConcatenatingIterator(tc, c.inputs[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iters = append(iters, iter)
|
||||
return db.NewMergingIterator(icmp, iters...), nil
|
||||
}
|
||||
|
||||
// newConcatenatingIterator returns a concatenating iterator over all of the
|
||||
// input tables.
|
||||
func newConcatenatingIterator(tc *tableCache, inputs []fileMetadata) (cIter db.Iterator, retErr error) {
|
||||
iters := make([]db.Iterator, len(inputs))
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
for _, iter := range iters {
|
||||
if iter != nil {
|
||||
iter.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for i, f := range inputs {
|
||||
iter, err := tc.find(f.fileNum, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("leveldb: could not open table %d: %v", f.fileNum, err)
|
||||
}
|
||||
iters[i] = iter
|
||||
}
|
||||
return db.NewConcatenatingIterator(iters...), nil
|
||||
}
|
||||
|
|
|
@ -5,12 +5,16 @@
|
|||
package leveldb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/leveldb-go/leveldb/db"
|
||||
"code.google.com/p/leveldb-go/leveldb/table"
|
||||
)
|
||||
|
||||
func TestPickCompaction(t *testing.T) {
|
||||
|
@ -541,3 +545,113 @@ func TestIsBaseLevelForUkey(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompaction(t *testing.T) {
|
||||
const writeBufferSize = 1000
|
||||
|
||||
// TODO: implement func Create instead of Open'ing a pre-existing empty DB.
|
||||
fs, err := cloneFileSystem(db.DefaultFileSystem, "../testdata/db-stage-1")
|
||||
if err != nil {
|
||||
t.Fatalf("cloneFileSystem failed: %v", err)
|
||||
}
|
||||
d, err := Open("", &db.Options{
|
||||
FileSystem: fs,
|
||||
WriteBufferSize: writeBufferSize,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
get1 := func(x db.DB) (ret string) {
|
||||
b := &bytes.Buffer{}
|
||||
iter := x.Find(nil, nil)
|
||||
for iter.Next() {
|
||||
b.Write(internalKey(iter.Key()).ukey())
|
||||
}
|
||||
if err := iter.Close(); err != nil {
|
||||
t.Fatalf("iterator Close: %v", err)
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
getAll := func() (gotMem, gotDisk string, err error) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
if d.mem != nil {
|
||||
gotMem = get1(d.mem)
|
||||
}
|
||||
ss := []string(nil)
|
||||
v := d.versions.currentVersion()
|
||||
for _, files := range v.files {
|
||||
for _, meta := range files {
|
||||
f, err := fs.Open(dbFilename("", fileTypeTable, meta.fileNum))
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("Open: %v", err)
|
||||
}
|
||||
defer f.Close()
|
||||
r := table.NewReader(f, &db.Options{
|
||||
Comparer: internalKeyComparer{db.DefaultComparer},
|
||||
})
|
||||
defer r.Close()
|
||||
ss = append(ss, get1(r)+".")
|
||||
}
|
||||
}
|
||||
sort.Strings(ss)
|
||||
return gotMem, strings.Join(ss, ""), nil
|
||||
}
|
||||
|
||||
value := bytes.Repeat([]byte("x"), writeBufferSize*6/10)
|
||||
testCases := []struct {
|
||||
key, wantMem, wantDisk string
|
||||
}{
|
||||
{"+A", "A", ""},
|
||||
{"+a", "Aa", ""},
|
||||
{"+B", "B", "Aa."},
|
||||
{"+b", "Bb", "Aa."},
|
||||
// The next level-0 table overwrites the B key.
|
||||
{"+C", "C", "Aa.Bb."},
|
||||
{"+B", "BC", "Aa.Bb."},
|
||||
// The next level-0 table deletes the a key.
|
||||
{"+D", "D", "Aa.BC.Bb."},
|
||||
{"-a", "Da", "Aa.BC.Bb."},
|
||||
{"+d", "Dad", "Aa.BC.Bb."},
|
||||
// The next addition creates the fourth level-0 table, and l0CompactionTrigger == 4,
|
||||
// so this triggers a non-trivial compaction into one level-1 table. Note that the
|
||||
// keys in this one larger table are interleaved from the four smaller ones.
|
||||
{"+E", "E", "ABCDbd."},
|
||||
{"+e", "Ee", "ABCDbd."},
|
||||
{"+F", "F", "ABCDbd.Ee."},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
if key := tc.key[1:]; tc.key[0] == '+' {
|
||||
if err := d.Set([]byte(key), value, nil); err != nil {
|
||||
t.Errorf("%q: Set: %v", key, err)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
if err := d.Delete([]byte(key), nil); err != nil {
|
||||
t.Errorf("%q: Delete: %v", key, err)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Allow any writes to the memfs to complete.
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
|
||||
gotMem, gotDisk, err := getAll()
|
||||
if err != nil {
|
||||
t.Errorf("%q: %v", tc.key, err)
|
||||
break
|
||||
}
|
||||
if gotMem != tc.wantMem {
|
||||
t.Errorf("%q: mem: got %q, want %q", tc.key, gotMem, tc.wantMem)
|
||||
}
|
||||
if gotDisk != tc.wantDisk {
|
||||
t.Errorf("%q: sst: got %q, want %q", tc.key, gotDisk, tc.wantDisk)
|
||||
}
|
||||
}
|
||||
|
||||
if err := d.Close(); err != nil {
|
||||
t.Fatalf("db Close: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ const (
|
|||
// - BlockRestartInterval
|
||||
// - BlockSize
|
||||
// - Compression
|
||||
// - WriteBufferSize
|
||||
type Options struct {
|
||||
// BlockRestartInterval is the number of keys between restart points
|
||||
// for delta encoding of keys.
|
||||
|
@ -68,6 +69,18 @@ type Options struct {
|
|||
// The default value is 1000.
|
||||
MaxOpenFiles int
|
||||
|
||||
// WriteBufferSize is the amount of data to build up in memory (backed by
|
||||
// an unsorted log on disk) before converting to a sorted on-disk file.
|
||||
//
|
||||
// Larger values increase performance, especially during bulk loads. Up to
|
||||
// two write buffers may be held in memory at the same time, so you may
|
||||
// wish to adjust this parameter to control memory usage. Also, a larger
|
||||
// write buffer will result in a longer recovery time the next time the
|
||||
// database is opened.
|
||||
//
|
||||
// The default value is 4MiB.
|
||||
WriteBufferSize int
|
||||
|
||||
// VerifyChecksums is whether to verify the per-block checksums in a DB.
|
||||
//
|
||||
// The default value is false.
|
||||
|
@ -117,6 +130,13 @@ func (o *Options) GetMaxOpenFiles() int {
|
|||
return o.MaxOpenFiles
|
||||
}
|
||||
|
||||
func (o *Options) GetWriteBufferSize() int {
|
||||
if o == nil || o.WriteBufferSize <= 0 {
|
||||
return 4 * 1024 * 1024
|
||||
}
|
||||
return o.WriteBufferSize
|
||||
}
|
||||
|
||||
func (o *Options) GetVerifyChecksums() bool {
|
||||
if o == nil {
|
||||
return false
|
||||
|
|
|
@ -37,6 +37,9 @@ const (
|
|||
internalKeyKindMax internalKeyKind = 1
|
||||
)
|
||||
|
||||
// internalKeySeqNumMax is the largest valid sequence number.
|
||||
const internalKeySeqNumMax = uint64(1<<56 - 1)
|
||||
|
||||
// makeInternalKey makes an internalKey from a user key, a kind, and a sequence
|
||||
// number. The return value may be a slice of dst[:cap(dst)] if it is large
|
||||
// enough. Otherwise, it may be a slice of a newly allocated buffer. In any
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/leveldb-go/leveldb/db"
|
||||
"code.google.com/p/leveldb-go/leveldb/memdb"
|
||||
|
@ -27,6 +28,14 @@ const (
|
|||
// starts.
|
||||
l0CompactionTrigger = 4
|
||||
|
||||
// l0SlowdownWritesTrigger is the soft limit on number of level-0 files.
|
||||
// We slow down writes at this point.
|
||||
l0SlowdownWritesTrigger = 8
|
||||
|
||||
// l0StopWritesTrigger is the maximum number of level-0 files. We stop
|
||||
// writes at this point.
|
||||
l0StopWritesTrigger = 12
|
||||
|
||||
// minTableCacheSize is the minimum size of the table cache.
|
||||
minTableCacheSize = 64
|
||||
|
||||
|
@ -49,9 +58,10 @@ type DB struct {
|
|||
// below.
|
||||
mu sync.Mutex
|
||||
|
||||
fileLock io.Closer
|
||||
logFile db.File
|
||||
log *record.Writer
|
||||
fileLock io.Closer
|
||||
logNumber uint64
|
||||
logFile db.File
|
||||
log *record.Writer
|
||||
|
||||
versions versionSet
|
||||
|
||||
|
@ -61,6 +71,9 @@ type DB struct {
|
|||
// higher than imm's, and imm's sequence numbers are all higher than
|
||||
// those on-disk.
|
||||
mem, imm *memdb.MemDB
|
||||
|
||||
compactionCond sync.Cond
|
||||
compacting bool
|
||||
}
|
||||
|
||||
var _ db.DB = (*DB)(nil)
|
||||
|
@ -72,12 +85,13 @@ func (d *DB) Get(key []byte, opts *db.ReadOptions) ([]byte, error) {
|
|||
current := d.versions.currentVersion()
|
||||
// TODO: do we need to ref-count the current version, so that we don't
|
||||
// delete its underlying files if we have a concurrent compaction?
|
||||
memtables := [2]*memdb.MemDB{d.mem, d.imm}
|
||||
d.mu.Unlock()
|
||||
|
||||
ikey := makeInternalKey(nil, key, internalKeyKindMax, snapshot)
|
||||
|
||||
// Look in the memtables before going to the on-disk current version.
|
||||
for _, mem := range [2]*memdb.MemDB{d.mem, d.imm} {
|
||||
for _, mem := range memtables {
|
||||
if mem == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -116,14 +130,16 @@ func (d *DB) Apply(batch Batch, opts *db.WriteOptions) error {
|
|||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
// TODO: compact d.mem if there is not enough room for the batch.
|
||||
// This may require temporarily releasing d.mu.
|
||||
if err := d.makeRoomForWrite(false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
seqNum := d.versions.lastSequence + 1
|
||||
batch.setSeqNum(seqNum)
|
||||
d.versions.lastSequence += uint64(n)
|
||||
|
||||
// Write the batch to the log.
|
||||
// TODO: drop and re-acquire d.mu around the I/O.
|
||||
w, err := d.log.Next()
|
||||
if err != nil {
|
||||
return fmt.Errorf("leveldb: could not create log entry: %v", err)
|
||||
|
@ -200,8 +216,12 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
|
|||
}
|
||||
d.tableCache.init(dirname, opts.GetFileSystem(), &d.icmpOpts, tableCacheSize)
|
||||
d.mem = memdb.New(&d.icmpOpts)
|
||||
d.compactionCond = sync.Cond{L: &d.mu}
|
||||
fs := opts.GetFileSystem()
|
||||
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
// Lock the database directory.
|
||||
err := fs.MkdirAll(dirname, 0755)
|
||||
if err != nil {
|
||||
|
@ -252,6 +272,7 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
|
|||
|
||||
// Create an empty .log file.
|
||||
ve.logNumber = d.versions.nextFileNum()
|
||||
d.logNumber = ve.logNumber
|
||||
logFile, err := fs.Create(dbFilename(dirname, fileTypeLog, ve.logNumber))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -268,14 +289,20 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: delete obsolete files.
|
||||
// TODO: maybe schedule compaction?
|
||||
if err := d.deleteObsoleteFiles(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.maybeScheduleCompaction()
|
||||
|
||||
d.logFile, logFile = logFile, nil
|
||||
d.fileLock, fileLock = fileLock, nil
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// replayLogFile replays the edits in the named log file.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) replayLogFile(ve *versionEdit, fs db.FileSystem, filename string) (maxSeqNum uint64, err error) {
|
||||
file, err := fs.Open(filename)
|
||||
if err != nil {
|
||||
|
@ -313,9 +340,7 @@ func (d *DB) replayLogFile(ve *versionEdit, fs db.FileSystem, filename string) (
|
|||
}
|
||||
|
||||
if mem == nil {
|
||||
mem = memdb.New(&db.Options{
|
||||
Comparer: d.icmp,
|
||||
})
|
||||
mem = memdb.New(&d.icmpOpts)
|
||||
}
|
||||
|
||||
t := b.iter()
|
||||
|
@ -372,6 +397,10 @@ func firstError(err0, err1 error) error {
|
|||
return err1
|
||||
}
|
||||
|
||||
// writeLevel0Table writes a memtable to a level-0 on-disk table.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMetadata, err error) {
|
||||
meta.fileNum = d.versions.nextFileNum()
|
||||
filename := dbFilename(d.dirname, fileTypeTable, meta.fileNum)
|
||||
|
@ -380,6 +409,11 @@ func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMeta
|
|||
// It is the caller's responsibility to remove that fileNum from the
|
||||
// set of pending outputs.
|
||||
|
||||
// Release the d.mu lock while doing I/O.
|
||||
// Note the unusual order: Unlock and then Lock.
|
||||
d.mu.Unlock()
|
||||
defer d.mu.Lock()
|
||||
|
||||
var (
|
||||
file db.File
|
||||
tw *table.Writer
|
||||
|
@ -460,3 +494,77 @@ func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMeta
|
|||
|
||||
return meta, nil
|
||||
}
|
||||
|
||||
// makeRoomForWrite ensures that there is room in d.mem for the next write.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) makeRoomForWrite(force bool) error {
|
||||
allowDelay := !force
|
||||
for {
|
||||
// TODO: check any previous sticky error, if the paranoid option is set.
|
||||
|
||||
if allowDelay && len(d.versions.currentVersion().files[0]) > l0SlowdownWritesTrigger {
|
||||
// We are getting close to hitting a hard limit on the number of
|
||||
// L0 files. Rather than delaying a single write by several
|
||||
// seconds when we hit the hard limit, start delaying each
|
||||
// individual write by 1ms to reduce latency variance.
|
||||
d.mu.Unlock()
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
d.mu.Lock()
|
||||
allowDelay = false
|
||||
// TODO: how do we ensure we are still 'at the front of the writer queue'?
|
||||
continue
|
||||
}
|
||||
|
||||
if !force && d.mem.ApproximateMemoryUsage() <= d.opts.GetWriteBufferSize() {
|
||||
// There is room in the current memtable.
|
||||
break
|
||||
}
|
||||
|
||||
if d.imm != nil {
|
||||
// We have filled up the current memtable, but the previous
|
||||
// one is still being compacted, so we wait.
|
||||
d.compactionCond.Wait()
|
||||
continue
|
||||
}
|
||||
|
||||
if len(d.versions.currentVersion().files[0]) > l0StopWritesTrigger {
|
||||
// There are too many level-0 files.
|
||||
d.compactionCond.Wait()
|
||||
continue
|
||||
}
|
||||
|
||||
// Attempt to switch to a new memtable and trigger compaction of old
|
||||
// TODO: drop and re-acquire d.mu around the I/O.
|
||||
newLogNumber := d.versions.nextFileNum()
|
||||
newLogFile, err := d.opts.GetFileSystem().Create(dbFilename(d.dirname, fileTypeLog, newLogNumber))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newLog := record.NewWriter(newLogFile)
|
||||
if err := d.log.Close(); err != nil {
|
||||
newLogFile.Close()
|
||||
return err
|
||||
}
|
||||
if err := d.logFile.Close(); err != nil {
|
||||
newLog.Close()
|
||||
newLogFile.Close()
|
||||
return err
|
||||
}
|
||||
d.logNumber, d.logFile, d.log = newLogNumber, newLogFile, newLog
|
||||
d.imm, d.mem = d.mem, memdb.New(&d.icmpOpts)
|
||||
force = false
|
||||
d.maybeScheduleCompaction()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteObsoleteFiles deletes those files that are no longer needed.
|
||||
//
|
||||
// d.mu must be held when calling this, but the mutex may be dropped and
|
||||
// re-acquired during the course of this method.
|
||||
func (d *DB) deleteObsoleteFiles() error {
|
||||
// TODO: implement.
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -5,8 +5,11 @@
|
|||
package leveldb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
|
@ -307,3 +310,64 @@ func TestBasicWrites(t *testing.T) {
|
|||
t.Fatalf("Close failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandomWrites(t *testing.T) {
|
||||
// TODO: implement func Create instead of Open'ing a pre-existing empty DB.
|
||||
fs, err := cloneFileSystem(db.DefaultFileSystem, "../testdata/db-stage-1")
|
||||
if err != nil {
|
||||
t.Fatalf("cloneFileSystem failed: %v", err)
|
||||
}
|
||||
d, err := Open("", &db.Options{
|
||||
FileSystem: fs,
|
||||
WriteBufferSize: 8 * 1024,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Open: %v", err)
|
||||
}
|
||||
|
||||
keys := [64][]byte{}
|
||||
wants := [64]int{}
|
||||
for k := range keys {
|
||||
keys[k] = []byte(strconv.Itoa(k))
|
||||
wants[k] = -1
|
||||
}
|
||||
xxx := bytes.Repeat([]byte("x"), 512)
|
||||
|
||||
rng := rand.New(rand.NewSource(123))
|
||||
const N = 1000
|
||||
for i := 0; i < N; i++ {
|
||||
k := rng.Intn(len(keys))
|
||||
if rng.Intn(20) != 0 {
|
||||
wants[k] = rng.Intn(len(xxx) + 1)
|
||||
if err := d.Set(keys[k], xxx[:wants[k]], nil); err != nil {
|
||||
t.Fatalf("i=%d: Set: %v", i, err)
|
||||
}
|
||||
} else {
|
||||
wants[k] = -1
|
||||
if err := d.Delete(keys[k], nil); err != nil {
|
||||
t.Fatalf("i=%d: Delete: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if i != N-1 || rng.Intn(50) != 0 {
|
||||
continue
|
||||
}
|
||||
for k := range keys {
|
||||
got := -1
|
||||
if v, err := d.Get(keys[k], nil); err != nil {
|
||||
if err != db.ErrNotFound {
|
||||
t.Fatalf("Get: %v", err)
|
||||
}
|
||||
} else {
|
||||
got = len(v)
|
||||
}
|
||||
if got != wants[k] {
|
||||
t.Errorf("i=%d, k=%d: got %d, want %d", i, k, got, wants[k])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := d.Close(); err != nil {
|
||||
t.Fatalf("db Close: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -132,6 +132,9 @@ func (vs *versionSet) load(dirname string, opts *db.Options) error {
|
|||
|
||||
// TODO: describe what this function does and how it interacts concurrently
|
||||
// with a running leveldb.
|
||||
//
|
||||
// d.mu must be held when calling this, for the enclosing *DB d.
|
||||
// TODO: actually pass d.mu, and drop and re-acquire it around the I/O.
|
||||
func (vs *versionSet) logAndApply(dirname string, ve *versionEdit) error {
|
||||
if ve.logNumber != 0 {
|
||||
if ve.logNumber < vs.logNumber || vs.nextFileNumber <= ve.logNumber {
|
||||
|
@ -173,8 +176,12 @@ func (vs *versionSet) logAndApply(dirname string, ve *versionEdit) error {
|
|||
|
||||
// Install the new version.
|
||||
vs.append(newVersion)
|
||||
vs.logNumber = ve.logNumber
|
||||
vs.prevLogNumber = ve.prevLogNumber
|
||||
if ve.logNumber != 0 {
|
||||
vs.logNumber = ve.logNumber
|
||||
}
|
||||
if ve.prevLogNumber != 0 {
|
||||
vs.prevLogNumber = ve.prevLogNumber
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче