leveldb: implement DB.Get to read from on-disk tables. Reading from

in-memory tables is a TODO.

R=bradfitz
CC=golang-dev
https://codereview.appspot.com/7235072
This commit is contained in:
Nigel Tao 2013-02-01 17:26:35 +11:00
Родитель 46cbf1fad8
Коммит 81a1bca8b0
6 изменённых файлов: 140 добавлений и 54 удалений

Просмотреть файл

@ -37,6 +37,31 @@ const (
internalKeyKindMax internalKeyKind = 1 internalKeyKindMax internalKeyKind = 1
) )
// makeInternalKey makes an internalKey from a user key, a kind, and a sequence
// number. The return value may be a slice of dst if dst is large enough.
// Otherwise, it may be a slice of a newly allocated buffer. In any case, all
// of dst may be overwritten, not just dst[len(dst):cap(dst)].
func makeInternalKey(dst internalKey, ukey []byte, kind internalKeyKind, seqNum uint64) internalKey {
if len(dst) < len(ukey)+8 {
n := 256
for n < len(ukey)+8 {
n *= 2
}
dst = make(internalKey, n)
}
ikey := dst[:len(ukey)+8]
i := copy(ikey, ukey)
ikey[i+0] = uint8(kind)
ikey[i+1] = uint8(seqNum)
ikey[i+2] = uint8(seqNum >> 8)
ikey[i+3] = uint8(seqNum >> 16)
ikey[i+4] = uint8(seqNum >> 24)
ikey[i+5] = uint8(seqNum >> 32)
ikey[i+6] = uint8(seqNum >> 40)
ikey[i+7] = uint8(seqNum >> 48)
return ikey
}
// valid returns whether k is a valid internal key. // valid returns whether k is a valid internal key.
func (k internalKey) valid() bool { func (k internalKey) valid() bool {
i := len(k) - 8 i := len(k) - 8
@ -69,19 +94,6 @@ func (k internalKey) seqNum() uint64 {
return n return n
} }
// encodeTrailer encodes kind and seqNum into k's 8 byte trailer.
func (k internalKey) encodeTrailer(kind internalKeyKind, seqNum uint64) {
i := len(k) - 8
k[i+0] = uint8(kind)
k[i+1] = uint8(seqNum)
k[i+2] = uint8(seqNum >> 8)
k[i+3] = uint8(seqNum >> 16)
k[i+4] = uint8(seqNum >> 24)
k[i+5] = uint8(seqNum >> 32)
k[i+6] = uint8(seqNum >> 40)
k[i+7] = uint8(seqNum >> 48)
}
// clone returns an internalKey that has the same contents but is backed by a // clone returns an internalKey that has the same contents but is backed by a
// different array. // different array.
func (k internalKey) clone() internalKey { func (k internalKey) clone() internalKey {

Просмотреть файл

@ -11,8 +11,7 @@ import (
) )
func TestInternalKey(t *testing.T) { func TestInternalKey(t *testing.T) {
k := internalKey("foo????????") k := makeInternalKey(nil, []byte("foo"), 1, 0x08070605040302)
k.encodeTrailer(internalKeyKind(1), 0x08070605040302)
if got, want := string(k), "foo\x01\x02\x03\x04\x05\x06\x07\x08"; got != want { if got, want := string(k), "foo\x01\x02\x03\x04\x05\x06\x07\x08"; got != want {
t.Fatalf("k = %q want %q", got, want) t.Fatalf("k = %q want %q", got, want)
} }

Просмотреть файл

@ -13,6 +13,7 @@ import (
"io" "io"
"path/filepath" "path/filepath"
"sort" "sort"
"sync"
"code.google.com/p/leveldb-go/leveldb/db" "code.google.com/p/leveldb-go/leveldb/db"
"code.google.com/p/leveldb-go/leveldb/memdb" "code.google.com/p/leveldb-go/leveldb/memdb"
@ -25,18 +26,33 @@ type DB struct {
dirname string dirname string
opts *db.Options opts *db.Options
icmp internalKeyComparer icmp internalKeyComparer
// icmpOpts is a copy of opts that overrides the Comparer to be icmp.
icmpOpts db.Options
fileLock io.Closer fileLock io.Closer
logFile db.File logFile db.File
log *record.Writer log *record.Writer
// TODO: describe exactly what this mutex protects.
mu sync.Mutex
versions versionSet versions versionSet
} }
var _ db.DB = (*DB)(nil) var _ db.DB = (*DB)(nil)
func (d *DB) Get(key []byte, opts *db.ReadOptions) ([]byte, error) { func (d *DB) Get(key []byte, opts *db.ReadOptions) ([]byte, error) {
panic("unimplemented") d.mu.Lock()
// TODO: add an opts.LastSequence field, or a DB.Snapshot method?
snapshot := d.versions.lastSequence
current := d.versions.currentVersion()
// TODO: do we need to ref-count the current version, so that we don't
// delete its underlying files if we have a concurrent compaction?
d.mu.Unlock()
ikey := makeInternalKey(nil, key, internalKeyKindMax, snapshot)
// TODO: look in memtables before going to the on-disk current version.
// TODO: update stats, maybe schedule compaction.
return current.get(ikey, d, d.icmp.userCmp, opts)
} }
func (d *DB) Set(key, value []byte, opts *db.WriteOptions) error { func (d *DB) Set(key, value []byte, opts *db.WriteOptions) error {
@ -82,6 +98,10 @@ func Open(dirname string, opts *db.Options) (*DB, error) {
opts: opts, opts: opts,
icmp: internalKeyComparer{opts.GetComparer()}, icmp: internalKeyComparer{opts.GetComparer()},
} }
if opts != nil {
d.icmpOpts = *opts
}
d.icmpOpts.Comparer = d.icmp
fs := opts.GetFileSystem() fs := opts.GetFileSystem()
// Lock the database directory. // Lock the database directory.
@ -222,18 +242,8 @@ func (d *DB) replayLogFile(ve *versionEdit, fs db.FileSystem, filename string) (
// having to import the top-level leveldb package. That extra abstraction // having to import the top-level leveldb package. That extra abstraction
// means that we need to copy to an intermediate buffer here, to reconstruct // means that we need to copy to an intermediate buffer here, to reconstruct
// the complete internal key to pass to the memdb. // the complete internal key to pass to the memdb.
if n := len(ikeyBuf); n < len(key)+8 { ikey := makeInternalKey(ikeyBuf, key, kind, seqNum)
for { ikeyBuf = ikey[:cap(ikey)]
n *= 2
if n >= len(key)+8 {
break
}
}
ikeyBuf = make(internalKey, n)
}
ikey := ikeyBuf[:len(key)+8]
copy(ikey, key)
ikey.encodeTrailer(kind, seqNum)
mem.Set(ikey, value, nil) mem.Set(ikey, value, nil)
} }
if len(t) != 0 { if len(t) != 0 {
@ -353,3 +363,11 @@ func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMeta
return meta, nil return meta, nil
} }
func (d *DB) openTable(fileNum uint64) (db.DB, error) {
f, err := d.opts.GetFileSystem().Open(dbFilename(d.dirname, fileTypeTable, fileNum))
if err != nil {
return nil, err
}
return table.NewReader(f, &d.icmpOpts), nil
}

Просмотреть файл

@ -81,29 +81,83 @@ func cloneFileSystem(srcFS db.FileSystem, dirname string) (db.FileSystem, error)
return dstFS, nil return dstFS, nil
} }
func TestOpen(t *testing.T) { func TestBasic(t *testing.T) {
dirnames := []string{ testCases := []struct {
"db-stage-1", dirname string
"db-stage-2", wantMap map[string]string
"db-stage-3", }{
"db-stage-4", {
"db-stage-1",
map[string]string{
"aaa": "",
"bar": "",
"baz": "",
"foo": "",
"quux": "",
"zzz": "",
},
},
{
"db-stage-2",
map[string]string{
"aaa": "",
"bar": "",
"baz": "three",
"foo": "four",
"quux": "",
"zzz": "",
},
},
{
"db-stage-3",
map[string]string{
"aaa": "",
"bar": "",
"baz": "three",
"foo": "four",
"quux": "",
"zzz": "",
},
},
{
"db-stage-4",
map[string]string{
"aaa": "",
"bar": "",
"baz": "",
"foo": "five",
"quux": "six",
"zzz": "",
},
},
} }
for _, dirname := range dirnames { for _, tc := range testCases {
fs, err := cloneFileSystem(db.DefaultFileSystem, "../testdata/"+dirname) fs, err := cloneFileSystem(db.DefaultFileSystem, "../testdata/"+tc.dirname)
if err != nil { if err != nil {
t.Errorf("%s: cloneFileSystem failed: %v", dirname, err) t.Errorf("%s: cloneFileSystem failed: %v", tc.dirname, err)
continue continue
} }
d, err := Open("", &db.Options{ d, err := Open("", &db.Options{
FileSystem: fs, FileSystem: fs,
}) })
if err != nil { if err != nil {
t.Errorf("%s: Open failed: %v", dirname, err) t.Errorf("%s: Open failed: %v", tc.dirname, err)
continue continue
} }
for key, want := range tc.wantMap {
got, err := d.Get([]byte(key), nil)
if err != nil && err != db.ErrNotFound {
t.Errorf("%s: Get(%q) failed: %v", tc.dirname, key, err)
continue
}
if string(got) != string(want) {
t.Errorf("%s: Get(%q): got %q, want %q", tc.dirname, key, got, want)
continue
}
}
err = d.Close() err = d.Close()
if err != nil { if err != nil {
t.Errorf("%s: Close failed: %v", dirname, err) t.Errorf("%s: Close failed: %v", tc.dirname, err)
continue continue
} }
} }

Просмотреть файл

@ -5,7 +5,6 @@
package leveldb package leveldb
import ( import (
"errors"
"fmt" "fmt"
"sort" "sort"
@ -98,6 +97,10 @@ func (v *version) checkOrdering(icmp db.Comparer) error {
return nil return nil
} }
type tableOpener interface {
openTable(fileNum uint64) (db.DB, error)
}
// get looks up the internal key k0 in v's tables such that k and k0 have the // get looks up the internal key k0 in v's tables such that k and k0 have the
// same user key, and k0's sequence number is the highest such value that is // same user key, and k0's sequence number is the highest such value that is
// less than or equal to k's sequence number. // less than or equal to k's sequence number.
@ -105,7 +108,7 @@ func (v *version) checkOrdering(icmp db.Comparer) error {
// If k0's kind is set, the user key for that previous set action is returned. // If k0's kind is set, the user key for that previous set action is returned.
// If k0's kind is delete, the db.ErrNotFound error is returned. // If k0's kind is delete, the db.ErrNotFound error is returned.
// If there is no such k0, the db.ErrNotFound error is returned. // If there is no such k0, the db.ErrNotFound error is returned.
func (v *version) get(k internalKey, ucmp db.Comparer, ro *db.ReadOptions) ([]byte, error) { func (v *version) get(k internalKey, tOpener tableOpener, ucmp db.Comparer, ro *db.ReadOptions) ([]byte, error) {
ukey := k.ukey() ukey := k.ukey()
// get looks for k0 inside the on-disk table defined by f. Due to the order // get looks for k0 inside the on-disk table defined by f. Due to the order
// in which we search the tables, and the internalKeyComparer's ordering // in which we search the tables, and the internalKeyComparer's ordering
@ -116,10 +119,11 @@ func (v *version) get(k internalKey, ucmp db.Comparer, ro *db.ReadOptions) ([]by
// corruption error. If the search was not conclusive, we move on to the // corruption error. If the search was not conclusive, we move on to the
// next table. // next table.
get := func(f fileMetadata) (val []byte, conclusive bool, err error) { get := func(f fileMetadata) (val []byte, conclusive bool, err error) {
b, err := open(f) b, err := tOpener.openTable(f.fileNum)
if err != nil { if err != nil {
return nil, true, err return nil, true, err
} }
defer b.Close()
t := b.Find(k, ro) t := b.Find(k, ro)
if !t.Next() { if !t.Next() {
err = t.Close() err = t.Close()
@ -186,7 +190,3 @@ func (v *version) get(k internalKey, ucmp db.Comparer, ro *db.ReadOptions) ([]by
} }
return nil, db.ErrNotFound return nil, db.ErrNotFound
} }
var open = func(f fileMetadata) (db.DB, error) {
return nil, errors.New("unimplemented")
}

Просмотреть файл

@ -14,6 +14,12 @@ import (
"code.google.com/p/leveldb-go/leveldb/memdb" "code.google.com/p/leveldb-go/leveldb/memdb"
) )
type tableOpenerFunc func(fileNum uint64) (db.DB, error)
func (f tableOpenerFunc) openTable(fileNum uint64) (db.DB, error) {
return f(fileNum)
}
func TestVersion(t *testing.T) { func TestVersion(t *testing.T) {
// makeIkey converts a string like "foo.DEL.123" into an internal key // makeIkey converts a string like "foo.DEL.123" into an internal key
// consisting of a user key "foo", kind delete, and sequence number 123. // consisting of a user key "foo", kind delete, and sequence number 123.
@ -27,10 +33,7 @@ func TestVersion(t *testing.T) {
ukey := x[0] ukey := x[0]
kind := kinds[x[1]] kind := kinds[x[1]]
seqNum, _ := strconv.ParseUint(x[2], 10, 64) seqNum, _ := strconv.ParseUint(x[2], 10, 64)
ikey := make(internalKey, len(ukey)+8) return makeInternalKey(nil, []byte(ukey), kind, seqNum)
copy(ikey, ukey)
ikey.encodeTrailer(kind, seqNum)
return ikey
} }
// testTable is a table to insert into a version. // testTable is a table to insert into a version.
@ -446,13 +449,13 @@ func TestVersion(t *testing.T) {
// m is a map from file numbers to DBs. // m is a map from file numbers to DBs.
m := map[uint64]db.DB{} m := map[uint64]db.DB{}
open = func(f fileMetadata) (db.DB, error) { tableOpener := tableOpenerFunc(func(fileNum uint64) (db.DB, error) {
d, ok := m[f.fileNum] d, ok := m[fileNum]
if !ok { if !ok {
return nil, errors.New("no such file") return nil, errors.New("no such file")
} }
return d, nil return d, nil
} })
v := version{} v := version{}
for _, tt := range tc.tables { for _, tt := range tc.tables {
@ -501,7 +504,7 @@ func TestVersion(t *testing.T) {
for _, query := range tc.queries { for _, query := range tc.queries {
s := strings.Split(query, " ") s := strings.Split(query, " ")
val, err := v.get(makeIkey(s[0]), db.DefaultComparer, nil) val, err := v.get(makeIkey(s[0]), tableOpener, db.DefaultComparer, nil)
got, want := "", s[1] got, want := "", s[1]
if err != nil { if err != nil {
if err != db.ErrNotFound { if err != db.ErrNotFound {