leveldb: implement table cache.

R=bradfitz
CC=golang-dev
https://codereview.appspot.com/14960043
This commit is contained in:
Nigel Tao 2013-10-22 14:24:29 +11:00
Родитель cc001bdde3
Коммит e2e3269e0b
6 изменённых файлов: 449 добавлений и 32 удалений

Просмотреть файл

@ -26,6 +26,7 @@ const (
// Read/Write options:
// - Comparer
// - FileSystem
// - MaxOpenFiles
// Read options:
// - VerifyChecksums
// Write options:
@ -61,6 +62,12 @@ type Options struct {
// The default value uses the underlying operating system's file system.
FileSystem FileSystem
// MaxOpenFiles is a soft limit on the number of open files that can be
// used by the DB.
//
// The default value is 1000.
MaxOpenFiles int
// VerifyChecksums is whether to verify the per-block checksums in a DB.
//
// The default value is false.
@ -103,6 +110,13 @@ func (o *Options) GetFileSystem() FileSystem {
return o.FileSystem
}
func (o *Options) GetMaxOpenFiles() int {
if o == nil || o.MaxOpenFiles == 0 {
return 1000
}
return o.MaxOpenFiles
}
func (o *Options) GetVerifyChecksums() bool {
if o == nil {
return false

Просмотреть файл

@ -26,6 +26,13 @@ const (
// l0CompactionTrigger is the number of files at which level-0 compaction
// starts.
l0CompactionTrigger = 4
// minTableCacheSize is the minimum size of the table cache.
minTableCacheSize = 64
// numNonTableCacheFiles is an approximation for the number of MaxOpenFiles
// that we don't use for table caches.
numNonTableCacheFiles = 10
)
// TODO: document DB.
@ -36,6 +43,8 @@ type DB struct {
// icmpOpts is a copy of opts that overrides the Comparer to be icmp.
icmpOpts db.Options
tableCache tableCache
// TODO: describe exactly what this mutex protects. So far: every field
// below.
mu sync.Mutex
@ -72,7 +81,7 @@ func (d *DB) Get(key []byte, opts *db.ReadOptions) ([]byte, error) {
if mem == nil {
continue
}
value, conclusive, err := internalGet(mem, d.icmp.userCmp, ikey, opts)
value, conclusive, err := internalGet(mem.Find(ikey, opts), d.icmp.userCmp, key)
if conclusive {
return value, err
}
@ -80,7 +89,7 @@ func (d *DB) Get(key []byte, opts *db.ReadOptions) ([]byte, error) {
// TODO: update stats, maybe schedule compaction.
return current.get(ikey, d, d.icmp.userCmp, opts)
return current.get(ikey, &d.tableCache, d.icmp.userCmp, opts)
}
func (d *DB) Set(key, value []byte, opts *db.WriteOptions) error {
@ -152,12 +161,13 @@ func (d *DB) Find(key []byte, opts *db.ReadOptions) db.Iterator {
}
func (d *DB) Close() error {
err := d.tableCache.Close()
d.mu.Lock()
defer d.mu.Unlock()
if d.fileLock == nil {
return nil
return err
}
err := d.fileLock.Close()
err = firstError(err, d.fileLock.Close())
d.fileLock = nil
return err
}
@ -184,6 +194,11 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
d.icmpOpts = *opts
}
d.icmpOpts.Comparer = d.icmp
tableCacheSize := opts.GetMaxOpenFiles() - numNonTableCacheFiles
if tableCacheSize < minTableCacheSize {
tableCacheSize = minTableCacheSize
}
d.tableCache.init(dirname, opts.GetFileSystem(), &d.icmpOpts, tableCacheSize)
d.mem = memdb.New(&d.icmpOpts)
fs := opts.GetFileSystem()
@ -445,11 +460,3 @@ func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMeta
return meta, nil
}
func (d *DB) openTable(fileNum uint64) (db.DB, error) {
f, err := d.opts.GetFileSystem().Open(dbFilename(d.dirname, fileTypeTable, fileNum))
if err != nil {
return nil, err
}
return table.NewReader(f, &d.icmpOpts), nil
}

175
leveldb/table_cache.go Normal file
Просмотреть файл

@ -0,0 +1,175 @@
// Copyright 2013 The LevelDB-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package leveldb
import (
"sync"
"code.google.com/p/leveldb-go/leveldb/db"
"code.google.com/p/leveldb-go/leveldb/table"
)
type tableCache struct {
dirname string
fs db.FileSystem
opts *db.Options
size int
mu sync.Mutex
nodes map[uint64]*tableCacheNode
dummy tableCacheNode
}
func (c *tableCache) init(dirname string, fs db.FileSystem, opts *db.Options, size int) {
c.dirname = dirname
c.fs = fs
c.opts = opts
c.size = size
c.nodes = make(map[uint64]*tableCacheNode)
c.dummy.next = &c.dummy
c.dummy.prev = &c.dummy
}
func (c *tableCache) find(fileNum uint64, ikey internalKey) (db.Iterator, error) {
// Calling findNode gives us the responsibility of decrementing n's
// refCount. If opening the underlying table resulted in error, then we
// decrement this straight away. Otherwise, we pass that responsibility
// to the tableCacheIter, which decrements when it is closed.
n := c.findNode(fileNum)
x := <-n.result
if x.err != nil {
c.mu.Lock()
n.refCount--
if n.refCount == 0 {
go n.release()
}
c.mu.Unlock()
// Try loading the table again; the error may be transient.
go n.load(c)
return nil, x.err
}
n.result <- x
return &tableCacheIter{
Iterator: x.reader.Find(ikey, nil),
cache: c,
node: n,
}, nil
}
// findNode returns the node for the table with the given file number, creating
// that node if it didn't already exist. The caller is responsible for
// decrementing the returned node's refCount.
func (c *tableCache) findNode(fileNum uint64) *tableCacheNode {
c.mu.Lock()
defer c.mu.Unlock()
n := c.nodes[fileNum]
if n == nil {
n = &tableCacheNode{
fileNum: fileNum,
refCount: 1,
result: make(chan tableReaderOrError, 1),
}
c.nodes[fileNum] = n
if len(c.nodes) > c.size {
// Release the tail node.
tail := c.dummy.prev
delete(c.nodes, tail.fileNum)
tail.next.prev = tail.prev
tail.prev.next = tail.next
tail.refCount--
if tail.refCount == 0 {
go tail.release()
}
}
go n.load(c)
} else {
// Remove n from the doubly-linked list.
n.next.prev = n.prev
n.prev.next = n.next
}
// Insert n at the front of the doubly-linked list.
n.next = c.dummy.next
n.prev = &c.dummy
n.next.prev = n
n.prev.next = n
// The caller is responsible for decrementing the refCount.
n.refCount++
return n
}
func (c *tableCache) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
for n := c.dummy.next; n != &c.dummy; n = n.next {
n.refCount--
if n.refCount == 0 {
go n.release()
}
}
c.nodes = nil
c.dummy.next = nil
c.dummy.prev = nil
return nil
}
type tableReaderOrError struct {
reader *table.Reader
err error
}
type tableCacheNode struct {
fileNum uint64
result chan tableReaderOrError
// The remaining fields are protected by the tableCache mutex.
next, prev *tableCacheNode
refCount int
}
func (n *tableCacheNode) load(c *tableCache) {
f, err := c.fs.Open(dbFilename(c.dirname, fileTypeTable, n.fileNum))
if err != nil {
n.result <- tableReaderOrError{err: err}
return
}
n.result <- tableReaderOrError{reader: table.NewReader(f, c.opts)}
}
func (n *tableCacheNode) release() {
x := <-n.result
if x.err != nil {
return
}
x.reader.Close()
}
type tableCacheIter struct {
db.Iterator
cache *tableCache
node *tableCacheNode
closeErr error
closed bool
}
func (i *tableCacheIter) Close() error {
if i.closed {
return i.closeErr
}
i.closed = true
i.cache.mu.Lock()
i.node.refCount--
if i.node.refCount == 0 {
go i.node.release()
}
i.cache.mu.Unlock()
i.closeErr = i.Iterator.Close()
return i.closeErr
}

224
leveldb/table_cache_test.go Normal file
Просмотреть файл

@ -0,0 +1,224 @@
// Copyright 2013 The LevelDB-Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package leveldb
import (
"bytes"
"fmt"
"math/rand"
"sync"
"testing"
"time"
"code.google.com/p/leveldb-go/leveldb/db"
"code.google.com/p/leveldb-go/leveldb/memfs"
"code.google.com/p/leveldb-go/leveldb/table"
)
type tableCacheTestFile struct {
db.File
fs *tableCacheTestFS
name string
}
func (f *tableCacheTestFile) Close() error {
f.fs.mu.Lock()
if f.fs.closeCounts != nil {
f.fs.closeCounts[f.name]++
}
f.fs.mu.Unlock()
return f.File.Close()
}
type tableCacheTestFS struct {
db.FileSystem
mu sync.Mutex
openCounts map[string]int
closeCounts map[string]int
}
func (fs *tableCacheTestFS) Open(name string) (db.File, error) {
fs.mu.Lock()
if fs.openCounts != nil {
fs.openCounts[name]++
}
fs.mu.Unlock()
f, err := fs.FileSystem.Open(name)
if err != nil {
return nil, err
}
return &tableCacheTestFile{f, fs, name}, nil
}
func (fs *tableCacheTestFS) validate(t *testing.T, c *tableCache, f func(i, gotO, gotC int)) {
// Let any clean-up goroutines do their work.
time.Sleep(1 * time.Millisecond)
fs.mu.Lock()
defer fs.mu.Unlock()
numStillOpen := 0
for i := 0; i < tableCacheTestNumTables; i++ {
filename := dbFilename("", fileTypeTable, uint64(i))
gotO, gotC := fs.openCounts[filename], fs.closeCounts[filename]
if gotO > gotC {
numStillOpen++
}
if gotC != gotO && gotC != gotO-1 {
t.Errorf("i=%d: table closed too many or too few times: opened %d times, closed %d times", i, gotO, gotC)
}
if f != nil {
f(i, gotO, gotC)
}
}
if numStillOpen > tableCacheTestCacheSize {
t.Errorf("numStillOpen is %d, want <= %d", numStillOpen, tableCacheTestCacheSize)
}
// Close the tableCache and let any clean-up goroutines do their work.
fs.mu.Unlock()
c.Close()
time.Sleep(1 * time.Millisecond)
fs.mu.Lock()
for i := 0; i < tableCacheTestNumTables; i++ {
filename := dbFilename("", fileTypeTable, uint64(i))
gotO, gotC := fs.openCounts[filename], fs.closeCounts[filename]
if gotO != gotC {
t.Errorf("i=%d: opened %d times, closed %d times", i, gotO, gotC)
}
}
}
const (
tableCacheTestNumTables = 300
tableCacheTestCacheSize = 100
)
func newTableCache() (*tableCache, *tableCacheTestFS, error) {
xxx := bytes.Repeat([]byte("x"), tableCacheTestNumTables)
fs := &tableCacheTestFS{
FileSystem: memfs.New(),
}
for i := 0; i < tableCacheTestNumTables; i++ {
f, err := fs.Create(dbFilename("", fileTypeTable, uint64(i)))
if err != nil {
return nil, nil, fmt.Errorf("fs.Create: %v", err)
}
tw := table.NewWriter(f, &db.Options{
Comparer: internalKeyComparer{userCmp: db.DefaultComparer},
})
if err := tw.Set(makeIkey(fmt.Sprintf("k.SET.%d", i)), xxx[:i], nil); err != nil {
return nil, nil, fmt.Errorf("tw.Set: %v", err)
}
if err := tw.Close(); err != nil {
return nil, nil, fmt.Errorf("tw.Close: %v", err)
}
}
fs.mu.Lock()
fs.openCounts = map[string]int{}
fs.closeCounts = map[string]int{}
fs.mu.Unlock()
c := &tableCache{}
c.init("", fs, nil, tableCacheTestCacheSize)
return c, fs, nil
}
func testTableCacheRandomAccess(t *testing.T, concurrent bool) {
const N = 2000
c, fs, err := newTableCache()
if err != nil {
t.Fatal(err)
}
rngMu := sync.Mutex{}
rng := rand.New(rand.NewSource(1))
errc := make(chan error, N)
for i := 0; i < N; i++ {
go func(i int) {
rngMu.Lock()
fileNum, sleepTime := rng.Intn(tableCacheTestNumTables), rng.Intn(1000)
rngMu.Unlock()
iter, err := c.find(uint64(fileNum), []byte("k"))
if err != nil {
errc <- fmt.Errorf("i=%d, fileNum=%d: find: %v", i, fileNum, err)
return
}
if concurrent {
time.Sleep(time.Duration(sleepTime) * time.Microsecond)
}
if !iter.Next() {
errc <- fmt.Errorf("i=%d, fileNum=%d: next.0: got false, want true", i, fileNum)
return
}
if got := len(iter.Value()); got != fileNum {
errc <- fmt.Errorf("i=%d, fileNum=%d: value: got %d bytes, want %d", i, fileNum, got, fileNum)
return
}
if iter.Next() {
errc <- fmt.Errorf("i=%d, fileNum=%d: next.1: got true, want false", i, fileNum)
return
}
if err := iter.Close(); err != nil {
errc <- fmt.Errorf("i=%d, fileNum=%d: close: %v", i, fileNum, err)
return
}
errc <- nil
}(i)
if !concurrent {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
}
if concurrent {
for i := 0; i < N; i++ {
if err := <-errc; err != nil {
t.Fatal(err)
}
}
}
fs.validate(t, c, nil)
}
func TestTableCacheRandomAccessSequential(t *testing.T) { testTableCacheRandomAccess(t, false) }
func TestTableCacheRandomAccessConcurrent(t *testing.T) { testTableCacheRandomAccess(t, true) }
func TestTableCacheFrequentlyUsed(t *testing.T) {
const (
N = 1000
pinned0 = 7
pinned1 = 11
)
c, fs, err := newTableCache()
if err != nil {
t.Fatal(err)
}
for i := 0; i < N; i++ {
for _, j := range [...]int{pinned0, i % tableCacheTestNumTables, pinned1} {
iter, err := c.find(uint64(j), nil)
if err != nil {
t.Fatalf("i=%d, j=%d: find: %v", i, j, err)
}
if err := iter.Close(); err != nil {
t.Fatalf("i=%d, j=%d: close: %v", i, j, err)
}
}
}
fs.validate(t, c, func(i, gotO, gotC int) {
if i == pinned0 || i == pinned1 {
if gotO != 1 || gotC != 0 {
t.Errorf("i=%d: pinned table: got %d, %d, want %d, %d", i, gotO, gotC, 1, 0)
}
} else if gotO == 1 {
t.Errorf("i=%d: table only opened once", i)
}
})
}

Просмотреть файл

@ -205,8 +205,9 @@ func (v *version) checkOrdering(icmp db.Comparer) error {
return nil
}
type tableOpener interface {
openTable(fileNum uint64) (db.DB, error)
// tableIkeyFinder finds the given ikey in the table of the given file number.
type tableIkeyFinder interface {
find(fileNum uint64, ikey internalKey) (db.Iterator, error)
}
// get looks up the internal key ikey0 in v's tables such that ikey and ikey0
@ -216,7 +217,7 @@ type tableOpener interface {
// If ikey0's kind is set, the value for that previous set action is returned.
// If ikey0's kind is delete, the db.ErrNotFound error is returned.
// If there is no such ikey0, the db.ErrNotFound error is returned.
func (v *version) get(ikey internalKey, tOpener tableOpener, ucmp db.Comparer, ro *db.ReadOptions) ([]byte, error) {
func (v *version) get(ikey internalKey, tiFinder tableIkeyFinder, ucmp db.Comparer, ro *db.ReadOptions) ([]byte, error) {
ukey := ikey.ukey()
// Iterate through v's tables, calling internalGet if the table's bounds
// might contain ikey. Due to the order in which we search the tables, and
@ -240,12 +241,11 @@ func (v *version) get(ikey internalKey, tOpener tableOpener, ucmp db.Comparer, r
if icmp.Compare(ikey, f.largest) > 0 {
continue
}
tab, err := tOpener.openTable(f.fileNum)
iter, err := tiFinder.find(f.fileNum, ikey)
if err != nil {
return nil, fmt.Errorf("leveldb: could not open table %d: %v", f.fileNum, err)
}
value, conclusive, err := internalGet(tab, ucmp, ikey, ro)
tab.Close()
value, conclusive, err := internalGet(iter, ucmp, ukey)
if conclusive {
return value, err
}
@ -268,12 +268,11 @@ func (v *version) get(ikey internalKey, tOpener tableOpener, ucmp db.Comparer, r
if ucmp.Compare(ukey, f.smallest.ukey()) < 0 {
continue
}
tab, err := tOpener.openTable(f.fileNum)
iter, err := tiFinder.find(f.fileNum, ikey)
if err != nil {
return nil, fmt.Errorf("leveldb: could not open table %d: %v", f.fileNum, err)
}
value, conclusive, err := internalGet(tab, ucmp, ikey, ro)
tab.Close()
value, conclusive, err := internalGet(iter, ucmp, ukey)
if conclusive {
return value, err
}
@ -291,10 +290,7 @@ func (v *version) get(ikey internalKey, tOpener tableOpener, ucmp db.Comparer, r
// * if that pair's key's kind is set, that pair's value will be returned,
// * if that pair's key's kind is delete, db.ErrNotFound will be returned.
// If the returned error is non-nil then conclusive will be true.
func internalGet(d db.DB, ucmp db.Comparer, ikey internalKey, ro *db.ReadOptions) (
value []byte, conclusive bool, err error) {
t := d.Find(ikey, ro)
func internalGet(t db.Iterator, ucmp db.Comparer, ukey []byte) (value []byte, conclusive bool, err error) {
if !t.Next() {
err = t.Close()
return nil, err != nil, err
@ -304,7 +300,7 @@ func internalGet(d db.DB, ucmp db.Comparer, ikey internalKey, ro *db.ReadOptions
t.Close()
return nil, true, fmt.Errorf("leveldb: corrupt table: invalid internal key")
}
if ucmp.Compare(ikey.ukey(), ikey0.ukey()) != 0 {
if ucmp.Compare(ukey, ikey0.ukey()) != 0 {
err = t.Close()
return nil, err != nil, err
}

Просмотреть файл

@ -81,10 +81,10 @@ func TestIkeyRange(t *testing.T) {
}
}
type tableOpenerFunc func(fileNum uint64) (db.DB, error)
type tableIkeyFinderFunc func(fileNum uint64, ikey internalKey) (db.Iterator, error)
func (f tableOpenerFunc) openTable(fileNum uint64) (db.DB, error) {
return f(fileNum)
func (f tableIkeyFinderFunc) find(fileNum uint64, ikey internalKey) (db.Iterator, error) {
return f(fileNum, ikey)
}
var makeIkeyKinds = map[string]internalKeyKind{
@ -517,12 +517,12 @@ func TestVersion(t *testing.T) {
// m is a map from file numbers to DBs.
m := map[uint64]db.DB{}
tableOpener := tableOpenerFunc(func(fileNum uint64) (db.DB, error) {
tiFinder := tableIkeyFinderFunc(func(fileNum uint64, ikey internalKey) (db.Iterator, error) {
d, ok := m[fileNum]
if !ok {
return nil, errors.New("no such file")
}
return d, nil
return d.Find(ikey, nil), nil
})
v := version{}
@ -530,6 +530,7 @@ func TestVersion(t *testing.T) {
d := memdb.New(&db.Options{
Comparer: icmp,
})
defer d.Close()
m[tt.fileNum] = d
var smallest, largest internalKey
@ -572,7 +573,7 @@ func TestVersion(t *testing.T) {
for _, query := range tc.queries {
s := strings.Split(query, " ")
value, err := v.get(makeIkey(s[0]), tableOpener, db.DefaultComparer, nil)
value, err := v.get(makeIkey(s[0]), tiFinder, db.DefaultComparer, nil)
got, want := "", s[1]
if err != nil {
if err != db.ErrNotFound {