leveldb: track pending outputs.

R=bradfitz
CC=golang-dev
https://codereview.appspot.com/21500043
This commit is contained in:
Nigel Tao 2013-11-05 11:56:40 +11:00
Родитель c6c9caddd9
Коммит 76619473b5
2 изменённых файлов: 46 добавлений и 17 удалений

Просмотреть файл

@ -207,11 +207,15 @@ func (d *DB) compact1() error {
})
}
ve, err := d.compactDiskTables(c)
ve, pendingOutputs, err := d.compactDiskTables(c)
if err != nil {
return err
}
if err := d.versions.logAndApply(d.dirname, ve); err != nil {
err = d.versions.logAndApply(d.dirname, ve)
for _, fileNum := range pendingOutputs {
delete(d.pendingOutputs, fileNum)
}
if err != nil {
return err
}
d.deleteObsoleteFiles()
@ -233,6 +237,7 @@ func (d *DB) compactMemTable() error {
{level: 0, meta: meta},
},
})
delete(d.pendingOutputs, meta.fileNum)
if err != nil {
return err
}
@ -246,7 +251,16 @@ func (d *DB) compactMemTable() error {
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, retErr error) {
func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, pendingOutputs []uint64, retErr error) {
defer func() {
if retErr != nil {
for _, fileNum := range pendingOutputs {
delete(d.pendingOutputs, fileNum)
}
pendingOutputs = nil
}
}()
// TODO: track snapshots.
smallestSnapshot := d.versions.lastSequence
@ -257,7 +271,7 @@ func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, retErr error) {
iter, err := compactionIterator(&d.tableCache, d.icmp, c)
if err != nil {
return nil, err
return nil, pendingOutputs, err
}
// TODO: output to more than one table, if it would otherwise be too large.
@ -330,13 +344,14 @@ func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, retErr error) {
if tw == nil {
d.mu.Lock()
fileNum = d.versions.nextFileNum()
// TODO: track pending outputs.
d.pendingOutputs[fileNum] = struct{}{}
pendingOutputs = append(pendingOutputs, fileNum)
d.mu.Unlock()
filename = dbFilename(d.dirname, fileTypeTable, fileNum)
file, err := d.opts.GetFileSystem().Create(filename)
if err != nil {
return nil, err
return nil, pendingOutputs, err
}
tw = table.NewWriter(file, &d.icmpOpts)
@ -346,7 +361,7 @@ func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, retErr error) {
}
largest = append(largest[:0], ikey...)
if err := tw.Set(ikey, iter.Value(), nil); err != nil {
return nil, err
return nil, pendingOutputs, err
}
}
@ -372,7 +387,7 @@ func (d *DB) compactDiskTables(c *compaction) (ve *versionEdit, retErr error) {
}] = true
}
}
return ve, nil
return ve, pendingOutputs, nil
}
// compactionIterator returns an iterator over all the tables in a compaction.

Просмотреть файл

@ -75,6 +75,8 @@ type DB struct {
compactionCond sync.Cond
compacting bool
pendingOutputs map[uint64]struct{}
}
var _ db.DB = (*DB)(nil)
@ -237,9 +239,10 @@ func createDB(dirname string, opts *db.Options) (retErr error) {
// Open opens a LevelDB whose files live in the given directory.
func Open(dirname string, opts *db.Options) (*DB, error) {
d := &DB{
dirname: dirname,
opts: opts,
icmp: internalKeyComparer{opts.GetComparer()},
dirname: dirname,
opts: opts,
icmp: internalKeyComparer{opts.GetComparer()},
pendingOutputs: make(map[uint64]struct{}),
}
if opts != nil {
d.icmpOpts = *opts
@ -425,6 +428,10 @@ func (d *DB) replayLogFile(ve *versionEdit, fs db.FileSystem, filename string) (
return 0, err
}
ve.newFiles = append(ve.newFiles, newFileEntry{level: 0, meta: meta})
// Strictly speaking, it's too early to delete meta.fileNum from d.pendingOutputs,
// but we are replaying the log file, which happens before Open returns, so there
// is no possibility of deleteObsoleteFiles being called concurrently here.
delete(d.pendingOutputs, meta.fileNum)
}
return maxSeqNum, nil
@ -441,15 +448,21 @@ func firstError(err0, err1 error) error {
// writeLevel0Table writes a memtable to a level-0 on-disk table.
//
// If no error is returned, it adds the file number of that on-disk table to
// d.pendingOutputs. It is the caller's responsibility to remove that fileNum
// from that set when it has been applied to d.versions.
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) writeLevel0Table(fs db.FileSystem, mem *memdb.MemDB) (meta fileMetadata, err error) {
meta.fileNum = d.versions.nextFileNum()
filename := dbFilename(d.dirname, fileTypeTable, meta.fileNum)
// TODO: add meta.fileNum to a set of 'pending outputs' so that a
// concurrent sweep of obsolete db files won't delete the fileNum file.
// It is the caller's responsibility to remove that fileNum from the
// set of pending outputs.
d.pendingOutputs[meta.fileNum] = struct{}{}
defer func(fileNum uint64) {
if err != nil {
delete(d.pendingOutputs, fileNum)
}
}(meta.fileNum)
// Release the d.mu lock while doing I/O.
// Note the unusual order: Unlock and then Lock.
@ -607,9 +620,10 @@ func (d *DB) makeRoomForWrite(force bool) error {
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) deleteObsoleteFiles() {
// TODO: (elsewhere) track pending outputs, and refer to them here.
liveFileNums := map[uint64]struct{}{}
for fileNum := range d.pendingOutputs {
liveFileNums[fileNum] = struct{}{}
}
d.versions.addLiveFileNums(liveFileNums)
logNumber := d.versions.logNumber
manifestFileNumber := d.versions.manifestFileNumber