2012-02-07 04:17:57 +04:00
|
|
|
// Copyright 2012 The LevelDB-Go Authors. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
// Package leveldb provides an ordered key/value store.
|
|
|
|
//
|
|
|
|
// BUG: This package is incomplete.
|
2015-07-20 10:56:13 +03:00
|
|
|
package leveldb // import "github.com/golang/leveldb"
|
2012-02-07 04:17:57 +04:00
|
|
|
|
|
|
|
import (
|
2012-05-07 14:50:35 +04:00
|
|
|
"bytes"
|
2013-02-18 07:16:43 +04:00
|
|
|
"errors"
|
2012-05-07 14:50:35 +04:00
|
|
|
"fmt"
|
2012-05-03 09:06:52 +04:00
|
|
|
"io"
|
2013-10-30 05:53:40 +04:00
|
|
|
"os"
|
2012-05-07 14:50:35 +04:00
|
|
|
"path/filepath"
|
|
|
|
"sort"
|
2013-02-01 10:26:35 +04:00
|
|
|
"sync"
|
2013-10-25 04:11:20 +04:00
|
|
|
"time"
|
2012-05-03 09:06:52 +04:00
|
|
|
|
2015-07-20 11:02:55 +03:00
|
|
|
"github.com/golang/leveldb/db"
|
|
|
|
"github.com/golang/leveldb/memdb"
|
|
|
|
"github.com/golang/leveldb/record"
|
|
|
|
"github.com/golang/leveldb/table"
|
2012-02-07 04:17:57 +04:00
|
|
|
)
|
2012-05-03 09:06:52 +04:00
|
|
|
|
2013-04-17 03:58:01 +04:00
|
|
|
const (
|
|
|
|
// l0CompactionTrigger is the number of files at which level-0 compaction
|
|
|
|
// starts.
|
|
|
|
l0CompactionTrigger = 4
|
2013-10-22 07:24:29 +04:00
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
// l0SlowdownWritesTrigger is the soft limit on number of level-0 files.
|
|
|
|
// We slow down writes at this point.
|
|
|
|
l0SlowdownWritesTrigger = 8
|
|
|
|
|
|
|
|
// l0StopWritesTrigger is the maximum number of level-0 files. We stop
|
|
|
|
// writes at this point.
|
|
|
|
l0StopWritesTrigger = 12
|
|
|
|
|
2013-10-22 07:24:29 +04:00
|
|
|
// minTableCacheSize is the minimum size of the table cache.
|
|
|
|
minTableCacheSize = 64
|
|
|
|
|
|
|
|
// numNonTableCacheFiles is an approximation for the number of MaxOpenFiles
|
|
|
|
// that we don't use for table caches.
|
|
|
|
numNonTableCacheFiles = 10
|
2013-04-17 03:58:01 +04:00
|
|
|
)
|
|
|
|
|
2012-05-03 09:06:52 +04:00
|
|
|
// TODO: document DB.
|
|
|
|
type DB struct {
|
|
|
|
dirname string
|
|
|
|
opts *db.Options
|
2012-05-07 14:50:35 +04:00
|
|
|
icmp internalKeyComparer
|
2013-02-01 10:26:35 +04:00
|
|
|
// icmpOpts is a copy of opts that overrides the Comparer to be icmp.
|
|
|
|
icmpOpts db.Options
|
2012-05-03 09:06:52 +04:00
|
|
|
|
2013-10-22 07:24:29 +04:00
|
|
|
tableCache tableCache
|
|
|
|
|
2013-02-18 07:16:43 +04:00
|
|
|
// TODO: describe exactly what this mutex protects. So far: every field
|
|
|
|
// below.
|
|
|
|
mu sync.Mutex
|
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
fileLock io.Closer
|
|
|
|
logNumber uint64
|
|
|
|
logFile db.File
|
|
|
|
log *record.Writer
|
2012-05-03 09:06:52 +04:00
|
|
|
|
|
|
|
versions versionSet
|
2013-02-18 07:16:43 +04:00
|
|
|
|
|
|
|
// mem is non-nil and the MemDB pointed to is mutable. imm is possibly
|
|
|
|
// nil, but if non-nil, the MemDB pointed to is immutable and will be
|
|
|
|
// copied out as an on-disk table. mem's sequence numbers are all
|
|
|
|
// higher than imm's, and imm's sequence numbers are all higher than
|
|
|
|
// those on-disk.
|
|
|
|
mem, imm *memdb.MemDB
|
2013-10-25 04:11:20 +04:00
|
|
|
|
|
|
|
compactionCond sync.Cond
|
|
|
|
compacting bool
|
2013-11-05 04:56:40 +04:00
|
|
|
|
2014-04-27 02:24:24 +04:00
|
|
|
closed bool
|
|
|
|
|
2013-11-05 04:56:40 +04:00
|
|
|
pendingOutputs map[uint64]struct{}
|
2012-05-03 09:06:52 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ db.DB = (*DB)(nil)
|
|
|
|
|
|
|
|
func (d *DB) Get(key []byte, opts *db.ReadOptions) ([]byte, error) {
|
2013-02-01 10:26:35 +04:00
|
|
|
d.mu.Lock()
|
|
|
|
// TODO: add an opts.LastSequence field, or a DB.Snapshot method?
|
|
|
|
snapshot := d.versions.lastSequence
|
|
|
|
current := d.versions.currentVersion()
|
|
|
|
// TODO: do we need to ref-count the current version, so that we don't
|
|
|
|
// delete its underlying files if we have a concurrent compaction?
|
2013-10-25 04:11:20 +04:00
|
|
|
memtables := [2]*memdb.MemDB{d.mem, d.imm}
|
2013-02-01 10:26:35 +04:00
|
|
|
d.mu.Unlock()
|
|
|
|
|
|
|
|
ikey := makeInternalKey(nil, key, internalKeyKindMax, snapshot)
|
2013-02-18 07:16:43 +04:00
|
|
|
|
|
|
|
// Look in the memtables before going to the on-disk current version.
|
2013-10-25 04:11:20 +04:00
|
|
|
for _, mem := range memtables {
|
2013-02-18 07:16:43 +04:00
|
|
|
if mem == nil {
|
|
|
|
continue
|
|
|
|
}
|
2013-10-22 07:24:29 +04:00
|
|
|
value, conclusive, err := internalGet(mem.Find(ikey, opts), d.icmp.userCmp, key)
|
2013-02-18 07:16:43 +04:00
|
|
|
if conclusive {
|
|
|
|
return value, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-02-01 10:26:35 +04:00
|
|
|
// TODO: update stats, maybe schedule compaction.
|
2013-02-18 07:16:43 +04:00
|
|
|
|
2013-10-22 07:24:29 +04:00
|
|
|
return current.get(ikey, &d.tableCache, d.icmp.userCmp, opts)
|
2012-05-03 09:06:52 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DB) Set(key, value []byte, opts *db.WriteOptions) error {
|
2013-02-18 07:16:43 +04:00
|
|
|
var batch Batch
|
|
|
|
batch.Set(key, value)
|
|
|
|
return d.Apply(batch, opts)
|
2012-05-03 09:06:52 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DB) Delete(key []byte, opts *db.WriteOptions) error {
|
2013-02-18 07:16:43 +04:00
|
|
|
var batch Batch
|
|
|
|
batch.Delete(key)
|
|
|
|
return d.Apply(batch, opts)
|
2012-05-03 09:06:52 +04:00
|
|
|
}
|
|
|
|
|
2012-05-07 14:50:35 +04:00
|
|
|
func (d *DB) Apply(batch Batch, opts *db.WriteOptions) error {
|
2013-02-18 07:16:43 +04:00
|
|
|
if len(batch.data) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
n := batch.count()
|
|
|
|
if n == invalidBatchCount {
|
|
|
|
return errors.New("leveldb: invalid batch")
|
|
|
|
}
|
|
|
|
|
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
if err := d.makeRoomForWrite(false); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2013-02-18 07:16:43 +04:00
|
|
|
|
|
|
|
seqNum := d.versions.lastSequence + 1
|
|
|
|
batch.setSeqNum(seqNum)
|
|
|
|
d.versions.lastSequence += uint64(n)
|
|
|
|
|
|
|
|
// Write the batch to the log.
|
2013-10-25 04:11:20 +04:00
|
|
|
// TODO: drop and re-acquire d.mu around the I/O.
|
2013-02-18 07:16:43 +04:00
|
|
|
w, err := d.log.Next()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("leveldb: could not create log entry: %v", err)
|
|
|
|
}
|
|
|
|
if _, err = w.Write(batch.data); err != nil {
|
|
|
|
return fmt.Errorf("leveldb: could not write log entry: %v", err)
|
|
|
|
}
|
|
|
|
if opts.GetSync() {
|
|
|
|
if err = d.log.Flush(); err != nil {
|
|
|
|
return fmt.Errorf("leveldb: could not flush log entry: %v", err)
|
|
|
|
}
|
|
|
|
if err = d.logFile.Sync(); err != nil {
|
|
|
|
return fmt.Errorf("leveldb: could not sync log entry: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Apply the batch to the memtable.
|
|
|
|
for iter, ikey := batch.iter(), internalKey(nil); ; seqNum++ {
|
|
|
|
kind, ukey, value, ok := iter.next()
|
|
|
|
if !ok {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
ikey = makeInternalKey(ikey, ukey, kind, seqNum)
|
|
|
|
d.mem.Set(ikey, value, nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
if seqNum != d.versions.lastSequence+1 {
|
|
|
|
panic("leveldb: inconsistent batch count")
|
|
|
|
}
|
|
|
|
return nil
|
2012-05-07 14:50:35 +04:00
|
|
|
}
|
|
|
|
|
2012-05-03 09:06:52 +04:00
|
|
|
func (d *DB) Find(key []byte, opts *db.ReadOptions) db.Iterator {
|
|
|
|
panic("unimplemented")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (d *DB) Close() error {
|
2013-02-18 07:16:43 +04:00
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
2014-04-27 02:24:24 +04:00
|
|
|
if d.closed {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
for d.compacting {
|
|
|
|
d.compactionCond.Wait()
|
2012-05-03 09:06:52 +04:00
|
|
|
}
|
2014-04-27 02:24:24 +04:00
|
|
|
err := d.tableCache.Close()
|
2014-04-26 23:26:29 +04:00
|
|
|
err = firstError(err, d.log.Close())
|
|
|
|
err = firstError(err, d.logFile.Close())
|
2013-10-22 07:24:29 +04:00
|
|
|
err = firstError(err, d.fileLock.Close())
|
2014-04-27 02:24:24 +04:00
|
|
|
d.closed = true
|
2012-05-03 09:06:52 +04:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2012-05-07 14:50:35 +04:00
|
|
|
type fileNumAndName struct {
|
|
|
|
num uint64
|
|
|
|
name string
|
|
|
|
}
|
|
|
|
|
|
|
|
type fileNumAndNameSlice []fileNumAndName
|
|
|
|
|
|
|
|
func (p fileNumAndNameSlice) Len() int { return len(p) }
|
|
|
|
func (p fileNumAndNameSlice) Less(i, j int) bool { return p[i].num < p[j].num }
|
|
|
|
func (p fileNumAndNameSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|
|
|
|
2013-10-30 05:53:40 +04:00
|
|
|
func createDB(dirname string, opts *db.Options) (retErr error) {
|
|
|
|
const manifestFileNum = 1
|
|
|
|
ve := versionEdit{
|
|
|
|
comparatorName: opts.GetComparer().Name(),
|
|
|
|
nextFileNumber: manifestFileNum + 1,
|
|
|
|
}
|
|
|
|
manifestFilename := dbFilename(dirname, fileTypeManifest, manifestFileNum)
|
|
|
|
f, err := opts.GetFileSystem().Create(manifestFilename)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("leveldb: could not create %q: %v", manifestFilename, err)
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if retErr != nil {
|
|
|
|
opts.GetFileSystem().Remove(manifestFilename)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
defer f.Close()
|
|
|
|
|
|
|
|
recWriter := record.NewWriter(f)
|
|
|
|
w, err := recWriter.Next()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = ve.encode(w)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = recWriter.Close()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return setCurrentFile(dirname, opts.GetFileSystem(), manifestFileNum)
|
|
|
|
}
|
|
|
|
|
2012-05-03 09:06:52 +04:00
|
|
|
// Open opens a LevelDB whose files live in the given directory.
|
|
|
|
func Open(dirname string, opts *db.Options) (*DB, error) {
|
|
|
|
d := &DB{
|
2013-11-05 04:56:40 +04:00
|
|
|
dirname: dirname,
|
|
|
|
opts: opts,
|
|
|
|
icmp: internalKeyComparer{opts.GetComparer()},
|
|
|
|
pendingOutputs: make(map[uint64]struct{}),
|
2012-05-03 09:06:52 +04:00
|
|
|
}
|
2013-02-01 10:26:35 +04:00
|
|
|
if opts != nil {
|
|
|
|
d.icmpOpts = *opts
|
|
|
|
}
|
|
|
|
d.icmpOpts.Comparer = d.icmp
|
2013-10-22 07:24:29 +04:00
|
|
|
tableCacheSize := opts.GetMaxOpenFiles() - numNonTableCacheFiles
|
|
|
|
if tableCacheSize < minTableCacheSize {
|
|
|
|
tableCacheSize = minTableCacheSize
|
|
|
|
}
|
|
|
|
d.tableCache.init(dirname, opts.GetFileSystem(), &d.icmpOpts, tableCacheSize)
|
2013-02-18 07:16:43 +04:00
|
|
|
d.mem = memdb.New(&d.icmpOpts)
|
2013-10-25 04:11:20 +04:00
|
|
|
d.compactionCond = sync.Cond{L: &d.mu}
|
2012-05-03 09:06:52 +04:00
|
|
|
fs := opts.GetFileSystem()
|
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
d.mu.Lock()
|
|
|
|
defer d.mu.Unlock()
|
|
|
|
|
2012-05-03 09:06:52 +04:00
|
|
|
// Lock the database directory.
|
|
|
|
err := fs.MkdirAll(dirname, 0755)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2012-08-05 16:34:10 +04:00
|
|
|
fileLock, err := fs.Lock(dbFilename(dirname, fileTypeLock, 0))
|
2012-05-03 09:06:52 +04:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if fileLock != nil {
|
|
|
|
fileLock.Close()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2013-10-30 05:53:40 +04:00
|
|
|
if _, err := fs.Stat(dbFilename(dirname, fileTypeCurrent, 0)); os.IsNotExist(err) {
|
|
|
|
// Create the DB if it did not already exist.
|
|
|
|
if err := createDB(dirname, opts); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
} else if err != nil {
|
|
|
|
return nil, fmt.Errorf("leveldb: database %q: %v", dirname, err)
|
|
|
|
} else if opts.GetErrorIfDBExists() {
|
|
|
|
return nil, fmt.Errorf("leveldb: database %q already exists", dirname)
|
|
|
|
}
|
2012-05-04 03:34:47 +04:00
|
|
|
|
2012-05-03 09:06:52 +04:00
|
|
|
// Load the version set.
|
|
|
|
err = d.versions.load(dirname, opts)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2012-05-07 14:50:35 +04:00
|
|
|
// Replay any newer log files than the ones named in the manifest.
|
2012-08-08 08:29:03 +04:00
|
|
|
var ve versionEdit
|
2012-05-07 14:50:35 +04:00
|
|
|
ls, err := fs.List(dirname)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var logFiles fileNumAndNameSlice
|
|
|
|
for _, filename := range ls {
|
2013-10-31 04:28:46 +04:00
|
|
|
ft, fn, ok := parseDBFilename(filename)
|
|
|
|
if ok && ft == fileTypeLog && (fn >= d.versions.logNumber || fn == d.versions.prevLogNumber) {
|
|
|
|
logFiles = append(logFiles, fileNumAndName{fn, filename})
|
2012-05-07 14:50:35 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
sort.Sort(logFiles)
|
|
|
|
for _, lf := range logFiles {
|
2012-08-08 08:29:03 +04:00
|
|
|
maxSeqNum, err := d.replayLogFile(&ve, fs, filepath.Join(dirname, lf.name))
|
2012-05-07 14:50:35 +04:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
d.versions.markFileNumUsed(lf.num)
|
|
|
|
if d.versions.lastSequence < maxSeqNum {
|
|
|
|
d.versions.lastSequence = maxSeqNum
|
|
|
|
}
|
|
|
|
}
|
2012-05-04 03:34:47 +04:00
|
|
|
|
2012-08-08 08:29:03 +04:00
|
|
|
// Create an empty .log file.
|
|
|
|
ve.logNumber = d.versions.nextFileNum()
|
2013-10-25 04:11:20 +04:00
|
|
|
d.logNumber = ve.logNumber
|
2012-08-08 08:29:03 +04:00
|
|
|
logFile, err := fs.Create(dbFilename(dirname, fileTypeLog, ve.logNumber))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer func() {
|
|
|
|
if logFile != nil {
|
|
|
|
logFile.Close()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
d.log = record.NewWriter(logFile)
|
|
|
|
|
|
|
|
// Write a new manifest to disk.
|
|
|
|
if err := d.versions.logAndApply(dirname, &ve); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2012-05-04 03:34:47 +04:00
|
|
|
|
2013-10-31 04:28:46 +04:00
|
|
|
d.deleteObsoleteFiles()
|
2013-10-25 04:11:20 +04:00
|
|
|
d.maybeScheduleCompaction()
|
2012-08-08 08:29:03 +04:00
|
|
|
|
|
|
|
d.logFile, logFile = logFile, nil
|
2012-05-03 09:06:52 +04:00
|
|
|
d.fileLock, fileLock = fileLock, nil
|
|
|
|
return d, nil
|
|
|
|
}
|
2012-05-07 14:50:35 +04:00
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
// replayLogFile replays the edits in the named log file.
|
|
|
|
//
|
|
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
|
|
// re-acquired during the course of this method.
|
2012-08-08 08:29:03 +04:00
|
|
|
func (d *DB) replayLogFile(ve *versionEdit, fs db.FileSystem, filename string) (maxSeqNum uint64, err error) {
|
2012-05-07 14:50:35 +04:00
|
|
|
file, err := fs.Open(filename)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
defer file.Close()
|
|
|
|
|
|
|
|
var (
|
|
|
|
mem *memdb.MemDB
|
|
|
|
batchBuf = new(bytes.Buffer)
|
2013-02-18 07:16:43 +04:00
|
|
|
ikey = make(internalKey, 512)
|
2012-05-07 14:50:35 +04:00
|
|
|
rr = record.NewReader(file)
|
|
|
|
)
|
|
|
|
for {
|
|
|
|
r, err := rr.Next()
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
_, err = io.Copy(batchBuf, r)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if batchBuf.Len() < batchHeaderLen {
|
|
|
|
return 0, fmt.Errorf("leveldb: corrupt log file %q", filename)
|
|
|
|
}
|
|
|
|
b := Batch{batchBuf.Bytes()}
|
|
|
|
seqNum := b.seqNum()
|
|
|
|
seqNum1 := seqNum + uint64(b.count())
|
|
|
|
if maxSeqNum < seqNum1 {
|
|
|
|
maxSeqNum = seqNum1
|
|
|
|
}
|
|
|
|
|
|
|
|
if mem == nil {
|
2013-10-25 04:11:20 +04:00
|
|
|
mem = memdb.New(&d.icmpOpts)
|
2012-05-07 14:50:35 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
t := b.iter()
|
|
|
|
for ; seqNum != seqNum1; seqNum++ {
|
2013-02-18 07:16:43 +04:00
|
|
|
kind, ukey, value, ok := t.next()
|
2012-05-07 14:50:35 +04:00
|
|
|
if !ok {
|
|
|
|
return 0, fmt.Errorf("leveldb: corrupt log file %q", filename)
|
|
|
|
}
|
|
|
|
// Convert seqNum, kind and key into an internalKey, and add that ikey/value
|
|
|
|
// pair to mem.
|
|
|
|
//
|
2013-02-18 07:16:43 +04:00
|
|
|
// TODO: instead of copying to an intermediate buffer (ikey), is it worth
|
2012-05-07 14:50:35 +04:00
|
|
|
// adding a SetTwoPartKey(db.TwoPartKey{key0, key1}, value, opts) method to
|
|
|
|
// memdb.MemDB? What effect does that have on the db.Comparer interface?
|
|
|
|
//
|
|
|
|
// The C++ LevelDB code does not need an intermediate copy because its memdb
|
|
|
|
// implementation is a private implementation detail, and copies each internal
|
|
|
|
// key component from the Batch format straight to the skiplist buffer.
|
|
|
|
//
|
|
|
|
// Go's LevelDB considers the memdb functionality to be useful in its own
|
|
|
|
// right, and so leveldb/memdb is a separate package that is usable without
|
|
|
|
// having to import the top-level leveldb package. That extra abstraction
|
|
|
|
// means that we need to copy to an intermediate buffer here, to reconstruct
|
|
|
|
// the complete internal key to pass to the memdb.
|
2013-02-18 07:16:43 +04:00
|
|
|
ikey = makeInternalKey(ikey, ukey, kind, seqNum)
|
2012-05-07 14:50:35 +04:00
|
|
|
mem.Set(ikey, value, nil)
|
|
|
|
}
|
|
|
|
if len(t) != 0 {
|
|
|
|
return 0, fmt.Errorf("leveldb: corrupt log file %q", filename)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: if mem is large enough, write it to a level-0 table and set mem = nil.
|
|
|
|
|
|
|
|
batchBuf.Reset()
|
|
|
|
}
|
|
|
|
|
2012-08-05 16:34:10 +04:00
|
|
|
if mem != nil && !mem.Empty() {
|
|
|
|
meta, err := d.writeLevel0Table(fs, mem)
|
|
|
|
if err != nil {
|
2012-05-07 14:50:35 +04:00
|
|
|
return 0, err
|
|
|
|
}
|
2012-08-08 08:29:03 +04:00
|
|
|
ve.newFiles = append(ve.newFiles, newFileEntry{level: 0, meta: meta})
|
2013-11-05 04:56:40 +04:00
|
|
|
// Strictly speaking, it's too early to delete meta.fileNum from d.pendingOutputs,
|
|
|
|
// but we are replaying the log file, which happens before Open returns, so there
|
|
|
|
// is no possibility of deleteObsoleteFiles being called concurrently here.
|
|
|
|
delete(d.pendingOutputs, meta.fileNum)
|
2012-05-07 14:50:35 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
return maxSeqNum, nil
|
|
|
|
}
|
2012-08-05 16:34:10 +04:00
|
|
|
|
|
|
|
// firstError returns the first non-nil error of err0 and err1, or nil if both
|
|
|
|
// are nil.
|
|
|
|
func firstError(err0, err1 error) error {
|
|
|
|
if err0 != nil {
|
|
|
|
return err0
|
|
|
|
}
|
|
|
|
return err1
|
|
|
|
}
|
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
// writeLevel0Table writes a memtable to a level-0 on-disk table.
|
|
|
|
//
|
2013-11-05 04:56:40 +04:00
|
|
|
// If no error is returned, it adds the file number of that on-disk table to
|
|
|
|
// d.pendingOutputs. It is the caller's responsibility to remove that fileNum
|
|
|
|
// from that set when it has been applied to d.versions.
|
|
|
|
//
|
2013-10-25 04:11:20 +04:00
|
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
|
|
// re-acquired during the course of this method.
|
2012-08-05 16:34:10 +04:00
|
|
|
func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMetadata, err error) {
|
|
|
|
meta.fileNum = d.versions.nextFileNum()
|
|
|
|
filename := dbFilename(d.dirname, fileTypeTable, meta.fileNum)
|
2013-11-05 04:56:40 +04:00
|
|
|
d.pendingOutputs[meta.fileNum] = struct{}{}
|
|
|
|
defer func(fileNum uint64) {
|
|
|
|
if err != nil {
|
|
|
|
delete(d.pendingOutputs, fileNum)
|
|
|
|
}
|
|
|
|
}(meta.fileNum)
|
2012-08-05 16:34:10 +04:00
|
|
|
|
2013-10-25 04:11:20 +04:00
|
|
|
// Release the d.mu lock while doing I/O.
|
|
|
|
// Note the unusual order: Unlock and then Lock.
|
|
|
|
d.mu.Unlock()
|
|
|
|
defer d.mu.Lock()
|
|
|
|
|
2012-08-05 16:34:10 +04:00
|
|
|
var (
|
|
|
|
file db.File
|
|
|
|
tw *table.Writer
|
|
|
|
iter db.Iterator
|
|
|
|
)
|
|
|
|
defer func() {
|
|
|
|
if iter != nil {
|
|
|
|
err = firstError(err, iter.Close())
|
|
|
|
}
|
|
|
|
if tw != nil {
|
|
|
|
err = firstError(err, tw.Close())
|
|
|
|
}
|
|
|
|
if file != nil {
|
|
|
|
err = firstError(err, file.Close())
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
fs.Remove(filename)
|
|
|
|
meta = fileMetadata{}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
file, err = fs.Create(filename)
|
|
|
|
if err != nil {
|
|
|
|
return fileMetadata{}, err
|
|
|
|
}
|
|
|
|
tw = table.NewWriter(file, &db.Options{
|
|
|
|
Comparer: d.icmp,
|
|
|
|
})
|
|
|
|
|
|
|
|
iter = mem.Find(nil, nil)
|
|
|
|
iter.Next()
|
|
|
|
meta.smallest = internalKey(iter.Key()).clone()
|
|
|
|
for {
|
|
|
|
meta.largest = iter.Key()
|
|
|
|
if err1 := tw.Set(meta.largest, iter.Value(), nil); err1 != nil {
|
|
|
|
return fileMetadata{}, err1
|
|
|
|
}
|
|
|
|
if !iter.Next() {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
meta.largest = meta.largest.clone()
|
|
|
|
|
|
|
|
if err1 := iter.Close(); err1 != nil {
|
|
|
|
iter = nil
|
|
|
|
return fileMetadata{}, err1
|
|
|
|
}
|
|
|
|
iter = nil
|
|
|
|
|
|
|
|
if err1 := tw.Close(); err1 != nil {
|
|
|
|
tw = nil
|
|
|
|
return fileMetadata{}, err1
|
|
|
|
}
|
|
|
|
tw = nil
|
|
|
|
|
2012-08-08 08:29:03 +04:00
|
|
|
// TODO: currently, closing a table.Writer closes its underlying file.
|
|
|
|
// We have to re-open the file to Sync or Stat it, which seems stupid.
|
|
|
|
file, err = fs.Open(filename)
|
|
|
|
if err != nil {
|
|
|
|
return fileMetadata{}, err
|
|
|
|
}
|
|
|
|
|
2012-08-05 16:34:10 +04:00
|
|
|
if err1 := file.Sync(); err1 != nil {
|
|
|
|
return fileMetadata{}, err1
|
|
|
|
}
|
|
|
|
|
|
|
|
if stat, err1 := file.Stat(); err1 != nil {
|
|
|
|
return fileMetadata{}, err1
|
|
|
|
} else {
|
|
|
|
size := stat.Size()
|
|
|
|
if size < 0 {
|
|
|
|
return fileMetadata{}, fmt.Errorf("leveldb: table file %q has negative size %d", filename, size)
|
|
|
|
}
|
|
|
|
meta.size = uint64(size)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: compaction stats.
|
|
|
|
|
|
|
|
return meta, nil
|
|
|
|
}
|
2013-10-25 04:11:20 +04:00
|
|
|
|
|
|
|
// makeRoomForWrite ensures that there is room in d.mem for the next write.
|
|
|
|
//
|
|
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
|
|
// re-acquired during the course of this method.
|
|
|
|
func (d *DB) makeRoomForWrite(force bool) error {
|
|
|
|
allowDelay := !force
|
|
|
|
for {
|
|
|
|
// TODO: check any previous sticky error, if the paranoid option is set.
|
|
|
|
|
|
|
|
if allowDelay && len(d.versions.currentVersion().files[0]) > l0SlowdownWritesTrigger {
|
|
|
|
// We are getting close to hitting a hard limit on the number of
|
|
|
|
// L0 files. Rather than delaying a single write by several
|
|
|
|
// seconds when we hit the hard limit, start delaying each
|
|
|
|
// individual write by 1ms to reduce latency variance.
|
|
|
|
d.mu.Unlock()
|
|
|
|
time.Sleep(1 * time.Millisecond)
|
|
|
|
d.mu.Lock()
|
|
|
|
allowDelay = false
|
|
|
|
// TODO: how do we ensure we are still 'at the front of the writer queue'?
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if !force && d.mem.ApproximateMemoryUsage() <= d.opts.GetWriteBufferSize() {
|
|
|
|
// There is room in the current memtable.
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
if d.imm != nil {
|
|
|
|
// We have filled up the current memtable, but the previous
|
|
|
|
// one is still being compacted, so we wait.
|
|
|
|
d.compactionCond.Wait()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(d.versions.currentVersion().files[0]) > l0StopWritesTrigger {
|
|
|
|
// There are too many level-0 files.
|
|
|
|
d.compactionCond.Wait()
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
// Attempt to switch to a new memtable and trigger compaction of old
|
|
|
|
// TODO: drop and re-acquire d.mu around the I/O.
|
|
|
|
newLogNumber := d.versions.nextFileNum()
|
|
|
|
newLogFile, err := d.opts.GetFileSystem().Create(dbFilename(d.dirname, fileTypeLog, newLogNumber))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
newLog := record.NewWriter(newLogFile)
|
|
|
|
if err := d.log.Close(); err != nil {
|
|
|
|
newLogFile.Close()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := d.logFile.Close(); err != nil {
|
|
|
|
newLog.Close()
|
|
|
|
newLogFile.Close()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
d.logNumber, d.logFile, d.log = newLogNumber, newLogFile, newLog
|
|
|
|
d.imm, d.mem = d.mem, memdb.New(&d.icmpOpts)
|
|
|
|
force = false
|
|
|
|
d.maybeScheduleCompaction()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// deleteObsoleteFiles deletes those files that are no longer needed.
|
|
|
|
//
|
|
|
|
// d.mu must be held when calling this, but the mutex may be dropped and
|
|
|
|
// re-acquired during the course of this method.
|
2013-10-31 04:28:46 +04:00
|
|
|
func (d *DB) deleteObsoleteFiles() {
|
|
|
|
liveFileNums := map[uint64]struct{}{}
|
2013-11-05 04:56:40 +04:00
|
|
|
for fileNum := range d.pendingOutputs {
|
|
|
|
liveFileNums[fileNum] = struct{}{}
|
|
|
|
}
|
2013-10-31 04:28:46 +04:00
|
|
|
d.versions.addLiveFileNums(liveFileNums)
|
|
|
|
logNumber := d.versions.logNumber
|
|
|
|
manifestFileNumber := d.versions.manifestFileNumber
|
|
|
|
|
|
|
|
// Release the d.mu lock while doing I/O.
|
|
|
|
// Note the unusual order: Unlock and then Lock.
|
|
|
|
d.mu.Unlock()
|
|
|
|
defer d.mu.Lock()
|
|
|
|
|
|
|
|
fs := d.opts.GetFileSystem()
|
|
|
|
list, err := fs.List(d.dirname)
|
|
|
|
if err != nil {
|
|
|
|
// Ignore any filesystem errors.
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for _, filename := range list {
|
|
|
|
fileType, fileNum, ok := parseDBFilename(filename)
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
keep := true
|
|
|
|
switch fileType {
|
|
|
|
case fileTypeLog:
|
|
|
|
// TODO: also look at prevLogNumber?
|
|
|
|
keep = fileNum >= logNumber
|
|
|
|
case fileTypeManifest:
|
|
|
|
keep = fileNum >= manifestFileNumber
|
2013-11-01 05:48:45 +04:00
|
|
|
case fileTypeTable, fileTypeOldFashionedTable:
|
2013-10-31 04:28:46 +04:00
|
|
|
_, keep = liveFileNums[fileNum]
|
|
|
|
}
|
|
|
|
if keep {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if fileType == fileTypeTable {
|
|
|
|
d.tableCache.evict(fileNum)
|
|
|
|
}
|
|
|
|
// Ignore any file system errors.
|
|
|
|
fs.Remove(filepath.Join(d.dirname, filename))
|
|
|
|
}
|
2013-10-25 04:11:20 +04:00
|
|
|
}
|