Garbage collection fix (#110)
* Garbage collection fix Issues fixed: 1. Garbage collection was not able to sustain the required size limit. 2. Exposed more badger options to help in compression 3. Fixed runtime bug encountered when event records with nil map was retrieved. * Incorporated comments * Incorporated Duke's comments
This commit is contained in:
Родитель
c29fcb8723
Коммит
3506d40246
|
@ -1,4 +1,4 @@
|
|||
package untyped
|
||||
package common
|
||||
|
||||
import (
|
||||
"github.com/dgraph-io/badger/v2"
|
||||
|
@ -25,7 +25,7 @@ func deleteKeys(db badgerwrap.DB, keysForDelete [][]byte) (error, int) {
|
|||
return nil, deletedKeysInThisBatch
|
||||
}
|
||||
|
||||
func DeleteKeysWithPrefix(keyPrefix []byte, db badgerwrap.DB, deletionBatchSize int) (error, float64, float64) {
|
||||
func DeleteKeysWithPrefix(keyPrefix []byte, db badgerwrap.DB, deletionBatchSize int) (error, int, int) {
|
||||
numOfKeysToDelete := 0
|
||||
numOfKeysDeleted := 0
|
||||
keysLeftToDelete := true
|
||||
|
@ -60,7 +60,7 @@ func DeleteKeysWithPrefix(keyPrefix []byte, db badgerwrap.DB, deletionBatchSize
|
|||
numOfKeysToDelete += len(keysThisBatch)
|
||||
numOfKeysDeleted += deletedKeysInThisBatch
|
||||
if err != nil {
|
||||
return err, float64(numOfKeysDeleted), float64(numOfKeysToDelete)
|
||||
return err, numOfKeysDeleted, numOfKeysToDelete
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,6 +69,21 @@ func DeleteKeysWithPrefix(keyPrefix []byte, db badgerwrap.DB, deletionBatchSize
|
|||
}
|
||||
}
|
||||
|
||||
return nil, float64(numOfKeysDeleted), float64(numOfKeysToDelete)
|
||||
return nil, numOfKeysDeleted, numOfKeysToDelete
|
||||
|
||||
}
|
||||
|
||||
func GetTotalKeyCount(db badgerwrap.DB) uint64 {
|
||||
var totalKeyCount uint64 = 0
|
||||
_ = db.View(func(txn badgerwrap.Txn) error {
|
||||
iterOpt := badger.DefaultIteratorOptions
|
||||
iterOpt.PrefetchValues = false
|
||||
it := txn.NewIterator(iterOpt)
|
||||
defer it.Close()
|
||||
for it.Rewind(); it.Valid(); it.Next() {
|
||||
totalKeyCount++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return totalKeyCount
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package untyped
|
||||
package common
|
||||
|
||||
import (
|
||||
"github.com/dgraph-io/badger/v2"
|
||||
|
@ -14,8 +14,8 @@ func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteAllKeys(t *testing.T) {
|
|||
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix(commonPrefix))
|
||||
err, numOfDeletedKeys, numOfKeysToDelete := DeleteKeysWithPrefix([]byte(commonPrefix), db, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, float64(4), numOfDeletedKeys)
|
||||
assert.Equal(t, float64(4), numOfKeysToDelete)
|
||||
assert.Equal(t, 4, numOfDeletedKeys)
|
||||
assert.Equal(t, 4, numOfKeysToDelete)
|
||||
}
|
||||
|
||||
func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteNoKeys(t *testing.T) {
|
||||
|
@ -23,8 +23,8 @@ func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteNoKeys(t *testing.T) {
|
|||
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix(commonPrefix))
|
||||
err, numOfDeletedKeys, numOfKeysToDelete := DeleteKeysWithPrefix([]byte(commonPrefix+"random"), db, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, float64(0), numOfDeletedKeys)
|
||||
assert.Equal(t, float64(0), numOfKeysToDelete)
|
||||
assert.Equal(t, 0, numOfDeletedKeys)
|
||||
assert.Equal(t, 0, numOfKeysToDelete)
|
||||
}
|
||||
|
||||
func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteSomeKeys(t *testing.T) {
|
||||
|
@ -33,8 +33,25 @@ func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteSomeKeys(t *testing.T) {
|
|||
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix("randomStuff"+commonPrefix))
|
||||
err, numOfDeletedKeys, numOfKeysToDelete := DeleteKeysWithPrefix([]byte(commonPrefix), db, 10)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, float64(4), numOfDeletedKeys)
|
||||
assert.Equal(t, float64(4), numOfKeysToDelete)
|
||||
assert.Equal(t, 4, numOfDeletedKeys)
|
||||
assert.Equal(t, 4, numOfKeysToDelete)
|
||||
}
|
||||
|
||||
func Test_Db_Utilities_GetTotalKeyCount_SomeKeys(t *testing.T) {
|
||||
db := helper_get_db(t)
|
||||
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix(commonPrefix))
|
||||
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix("randomStuff"+commonPrefix))
|
||||
numberOfKeys := GetTotalKeyCount(db)
|
||||
|
||||
// expected count is 8 as each call to helper_add_keys_to_db adds keys in 4 tables
|
||||
expectedNumberOfKeys := 8
|
||||
assert.Equal(t, uint64(expectedNumberOfKeys), numberOfKeys)
|
||||
}
|
||||
|
||||
func Test_Db_Utilities_GetTotalKeyCount_NoKeys(t *testing.T) {
|
||||
db := helper_get_db(t)
|
||||
numberOfKeys := GetTotalKeyCount(db)
|
||||
assert.Equal(t, uint64(0), numberOfKeys)
|
||||
}
|
||||
|
||||
func helper_get_db(t *testing.T) badgerwrap.DB {
|
|
@ -27,11 +27,10 @@ func Test_ParseKey_Start_Parts(t *testing.T) {
|
|||
assert.Equal(t, fmt.Errorf("key should start with /: %v", keyWith2Parts), err)
|
||||
}
|
||||
|
||||
|
||||
func Test_ParseKey_Success(t *testing.T) {
|
||||
keyWith2Parts := "/part1/part2/part3/part4/part5/part6"
|
||||
err, parts := ParseKey(keyWith2Parts)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, 7, len(parts))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -107,6 +107,11 @@ func storeMinutes(tables typed.Tables, txn badgerwrap.Txn, minToCount map[int64]
|
|||
return errors.Wrap(err, "Could not get event record")
|
||||
}
|
||||
|
||||
// some event records were being returned with nil MapMinToEvents, this was causing runtime exception. Adding a TODO to investigate why these kind of records exist.
|
||||
if eventRecord == nil || eventRecord.MapMinToEvents == nil {
|
||||
return errors.Wrap(err, "Either retrieved event record is nil or its MapMinToEvents is nil")
|
||||
}
|
||||
|
||||
if _, ok := eventRecord.MapMinToEvents[unixTime]; !ok {
|
||||
eventRecord.MapMinToEvents[unixTime] = &typed.EventCounts{MapReasonToCount: make(map[string]int32)}
|
||||
}
|
||||
|
|
|
@ -51,10 +51,13 @@ type SloopConfig struct {
|
|||
DisplayContext string `json:"displayContext"`
|
||||
ApiServerHost string `json:"apiServerHost"`
|
||||
WatchCrds bool `json:"watchCrds"`
|
||||
ThresholdForGC float64 `json:"threshold for GC"`
|
||||
RestoreDatabaseFile string `json:"restoreDatabaseFile"`
|
||||
BadgerDiscardRatio float64 `json:"badgerDiscardRatio"`
|
||||
BadgerVLogGCFreq time.Duration `json:"badgerVLogGCFreq"`
|
||||
BadgerMaxTableSize int64 `json:"badgerMaxTableSize"`
|
||||
BadgerLevelOneSize int64 `json:"badgerLevelOneSize"`
|
||||
BadgerLevSizeMultiplier int `json:"badgerLevSizeMultiplier"`
|
||||
BadgerKeepL0InMemory bool `json:"badgerKeepL0InMemory"`
|
||||
BadgerVLogFileSize int64 `json:"badgerVLogFileSize"`
|
||||
BadgerVLogMaxEntries uint `json:"badgerVLogMaxEntries"`
|
||||
|
@ -92,8 +95,11 @@ func registerFlags(fs *flag.FlagSet, config *SloopConfig) {
|
|||
fs.BoolVar(&config.WatchCrds, "watch-crds", true, "Watch for activity for CRDs")
|
||||
fs.StringVar(&config.RestoreDatabaseFile, "restore-database-file", "", "Restore database from backup file into current context.")
|
||||
fs.Float64Var(&config.BadgerDiscardRatio, "badger-discard-ratio", 0.1, "Badger value log GC uses this value to decide if it wants to compact a vlog file. Smaller values free more disk space but use more computing resources")
|
||||
fs.Float64Var(&config.ThresholdForGC, "gc-threshold", 0.8, "Threshold for GC to start garbage collecting")
|
||||
fs.DurationVar(&config.BadgerVLogGCFreq, "badger-vlog-gc-freq", time.Minute*1, "Frequency of running badger's ValueLogGC")
|
||||
fs.Int64Var(&config.BadgerMaxTableSize, "badger-max-table-size", 0, "Max LSM table size in bytes. 0 = use badger default")
|
||||
fs.Int64Var(&config.BadgerLevelOneSize, "badger-level-one-size", 0, "The maximum total size for Level 1. 0 = use badger default")
|
||||
fs.IntVar(&config.BadgerLevSizeMultiplier, "badger-level-size-multiplier", 0, "The ratio between the maximum sizes of contiguous levels in the LSM. 0 = use badger default")
|
||||
fs.BoolVar(&config.BadgerKeepL0InMemory, "badger-keep-l0-in-memory", true, "Keeps all level 0 tables in memory for faster writes and compactions")
|
||||
fs.Int64Var(&config.BadgerVLogFileSize, "badger-vlog-file-size", 0, "Max size in bytes per value log file. 0 = use badger default")
|
||||
fs.UintVar(&config.BadgerVLogMaxEntries, "badger-vlog-max-entries", 0, "Max number of entries per value log files. 0 = use badger default")
|
||||
|
|
|
@ -70,6 +70,8 @@ func RealMain() error {
|
|||
BadgerNumL0Tables: conf.BadgerNumL0Tables,
|
||||
BadgerNumL0TablesStall: conf.BadgerNumL0TablesStall,
|
||||
BadgerSyncWrites: conf.BadgerSyncWrites,
|
||||
BadgerLevelOneSize: conf.BadgerLevelOneSize,
|
||||
BadgerLevSizeMultiplier: conf.BadgerLevSizeMultiplier,
|
||||
}
|
||||
db, err := untyped.OpenStore(factory, storeConfig)
|
||||
if err != nil {
|
||||
|
@ -129,6 +131,7 @@ func RealMain() error {
|
|||
BadgerDiscardRatio: conf.BadgerDiscardRatio,
|
||||
BadgerVLogGCFreq: conf.BadgerVLogGCFreq,
|
||||
DeletionBatchSize: conf.DeletionBatchSize,
|
||||
GCThreshold: conf.ThresholdForGC,
|
||||
}
|
||||
storemgr = storemanager.NewStoreManager(tables, storeCfg, fs)
|
||||
storemgr.Start()
|
||||
|
|
|
@ -29,6 +29,8 @@ type Config struct {
|
|||
BadgerNumL0Tables int
|
||||
BadgerNumL0TablesStall int
|
||||
BadgerSyncWrites bool
|
||||
BadgerLevelOneSize int64
|
||||
BadgerLevSizeMultiplier int
|
||||
}
|
||||
|
||||
func OpenStore(factory badgerwrap.Factory, config *Config) (badgerwrap.DB, error) {
|
||||
|
@ -76,7 +78,16 @@ func OpenStore(factory badgerwrap.Factory, config *Config) (badgerwrap.DB, error
|
|||
opts = opts.WithNumLevelZeroTablesStall(config.BadgerNumL0TablesStall)
|
||||
}
|
||||
|
||||
opts.WithSyncWrites(config.BadgerSyncWrites)
|
||||
if config.BadgerLevelOneSize != 0 {
|
||||
opts = opts.WithLevelOneSize(config.BadgerLevelOneSize)
|
||||
}
|
||||
|
||||
if config.BadgerLevSizeMultiplier != 0 {
|
||||
opts = opts.WithLevelSizeMultiplier(config.BadgerLevSizeMultiplier)
|
||||
}
|
||||
|
||||
opts = opts.WithSyncWrites(config.BadgerSyncWrites)
|
||||
|
||||
db, err := factory.Open(opts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("badger.OpenStore failed with: %v", err)
|
||||
|
|
|
@ -15,7 +15,9 @@ import (
|
|||
"github.com/salesforce/sloop/pkg/sloop/common"
|
||||
"github.com/salesforce/sloop/pkg/sloop/store/typed"
|
||||
"github.com/salesforce/sloop/pkg/sloop/store/untyped"
|
||||
"github.com/salesforce/sloop/pkg/sloop/store/untyped/badgerwrap"
|
||||
"github.com/spf13/afero"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
@ -35,6 +37,7 @@ var (
|
|||
metricValueLogGcRunCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_valueLoggc_run_count"})
|
||||
metricValueLogGcLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_valueLoggc_latency_sec"})
|
||||
metricValueLogGcRunning = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_valueLoggc_running"})
|
||||
metricTotalNumberOfKeys = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_total_number_of_keys"})
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
|
@ -45,6 +48,7 @@ type Config struct {
|
|||
BadgerDiscardRatio float64
|
||||
BadgerVLogGCFreq time.Duration
|
||||
DeletionBatchSize int
|
||||
GCThreshold float64
|
||||
}
|
||||
|
||||
type StoreManager struct {
|
||||
|
@ -97,10 +101,10 @@ func (sm *StoreManager) gcLoop() {
|
|||
metricGcRunCount.Inc()
|
||||
before := time.Now()
|
||||
metricGcRunning.Set(1)
|
||||
cleanUpPerformed, numOfDeletedKeys, numOfKeysToDelete, err := doCleanup(sm.tables, sm.config.TimeLimit, sm.config.SizeLimitBytes, sm.stats, sm.config.DeletionBatchSize)
|
||||
cleanUpPerformed, numOfDeletedKeys, numOfKeysToDelete, err := doCleanup(sm.tables, sm.config.TimeLimit, sm.config.SizeLimitBytes, sm.stats, sm.config.DeletionBatchSize, sm.config.GCThreshold)
|
||||
metricGcCleanUpPerformed.Set(common.BoolToFloat(cleanUpPerformed))
|
||||
metricGcDeletedNumberOfKeys.Set(numOfDeletedKeys)
|
||||
metricGcNumberOfKeysToDelete.Set(numOfKeysToDelete)
|
||||
metricGcDeletedNumberOfKeys.Set(float64(numOfDeletedKeys))
|
||||
metricGcNumberOfKeysToDelete.Set(float64(numOfKeysToDelete))
|
||||
metricGcRunning.Set(0)
|
||||
if err == nil {
|
||||
metricGcSuccessCount.Inc()
|
||||
|
@ -169,7 +173,8 @@ func (sm *StoreManager) refreshStats() *storeStats {
|
|||
return sm.stats
|
||||
}
|
||||
|
||||
func doCleanup(tables typed.Tables, timeLimit time.Duration, sizeLimitBytes int, stats *storeStats, deletionBatchSize int) (bool, float64, float64, error) {
|
||||
func doCleanup(tables typed.Tables, timeLimit time.Duration, sizeLimitBytes int, stats *storeStats, deletionBatchSize int, gcThreshold float64) (bool, int64, int64, error) {
|
||||
|
||||
ok, minPartition, maxPartition, err := tables.GetMinAndMaxPartition()
|
||||
if err != nil {
|
||||
return false, 0, 0, fmt.Errorf("failed to get min partition : %s, max partition: %s, err:%v", minPartition, maxPartition, err)
|
||||
|
@ -188,28 +193,37 @@ func doCleanup(tables typed.Tables, timeLimit time.Duration, sizeLimitBytes int,
|
|||
metricAgeOfMaximumPartition.Set(maxPartitionAge)
|
||||
}
|
||||
|
||||
var totalNumOfDeletedKeys float64 = 0
|
||||
var totalNumOfKeysToDelete float64 = 0
|
||||
var totalNumOfDeletedKeys int64 = 0
|
||||
var totalNumOfKeysToDelete int64 = 0
|
||||
anyCleanupPerformed := false
|
||||
if cleanUpTimeCondition(minPartition, maxPartition, timeLimit) || cleanUpFileSizeCondition(stats, sizeLimitBytes) {
|
||||
partStart, partEnd, err := untyped.GetTimeRangeForPartition(minPartition)
|
||||
glog.Infof("GC removing partition %q with data from %v to %v (err %v)", minPartition, partStart, partEnd, err)
|
||||
var errMessages []string
|
||||
for _, tableName := range tables.GetTableNames() {
|
||||
prefix := fmt.Sprintf("/%s/%s", tableName, minPartition)
|
||||
start := time.Now()
|
||||
err, numOfDeletedKeys, numOfKeysToDelete := untyped.DeleteKeysWithPrefix([]byte(prefix), tables.Db(), deletionBatchSize)
|
||||
metricGcDeletedNumberOfKeysByTable.WithLabelValues(fmt.Sprintf("%v", tableName)).Set(numOfDeletedKeys)
|
||||
totalNumOfDeletedKeys += numOfDeletedKeys
|
||||
totalNumOfKeysToDelete += numOfKeysToDelete
|
||||
elapsed := time.Since(start)
|
||||
glog.Infof("Call to DeleteKeysWithPrefix(%v) took %v and removed %f keys with error: %v", prefix, elapsed, numOfDeletedKeys, err)
|
||||
if err != nil {
|
||||
errMessages = append(errMessages, fmt.Sprintf("failed to cleanup with min key: %s, elapsed: %v,err: %v,", prefix, elapsed, err))
|
||||
}
|
||||
anyCleanupPerformed = true
|
||||
minPartitionStartTime, err := untyped.GetTimeForPartition(minPartition)
|
||||
if err != nil {
|
||||
return false, 0, 0, err
|
||||
}
|
||||
|
||||
var numOfKeysToDeleteForFileSizeCondition int64 = 0
|
||||
isFileSizeConditionMet, garbageCollectionRatio := cleanUpFileSizeCondition(stats, sizeLimitBytes, gcThreshold)
|
||||
|
||||
if isFileSizeConditionMet {
|
||||
numOfKeysToDeleteForFileSizeCondition = getNumberOfKeysToDelete(tables.Db(), garbageCollectionRatio)
|
||||
}
|
||||
|
||||
beforeGCTime := time.Now()
|
||||
for cleanUpTimeCondition(minPartition, maxPartition, timeLimit) || numOfKeysToDeleteForFileSizeCondition > 0 {
|
||||
|
||||
numOfDeletedKeysforPrefix, numOfKeysToDeleteForPrefix, errMessages := deletePartition(minPartition, tables, deletionBatchSize)
|
||||
totalNumOfDeletedKeys += int64(numOfDeletedKeysforPrefix)
|
||||
totalNumOfKeysToDelete += int64(numOfKeysToDeleteForPrefix)
|
||||
anyCleanupPerformed = true
|
||||
minPartitionStartTime = minPartitionStartTime.Add(untyped.GetPartitionDuration())
|
||||
minPartition = untyped.GetPartitionId(minPartitionStartTime)
|
||||
|
||||
minPartitionAge, err := untyped.GetAgeOfPartitionInHours(minPartition)
|
||||
if err != nil || minPartitionAge < 0 {
|
||||
return false, totalNumOfDeletedKeys, totalNumOfKeysToDelete, fmt.Errorf("minimun partition age: %f cannot be less than zero", minPartitionAge)
|
||||
}
|
||||
|
||||
metricAgeOfMinimumPartition.Set(minPartitionAge)
|
||||
if len(errMessages) != 0 {
|
||||
var errMsg string
|
||||
for _, er := range errMessages {
|
||||
|
@ -217,11 +231,56 @@ func doCleanup(tables typed.Tables, timeLimit time.Duration, sizeLimitBytes int,
|
|||
}
|
||||
return false, totalNumOfDeletedKeys, totalNumOfKeysToDelete, fmt.Errorf(errMsg)
|
||||
}
|
||||
|
||||
glog.Infof("Deleted Number of keys so far: %v ", totalNumOfDeletedKeys)
|
||||
if numOfKeysToDeleteForFileSizeCondition > totalNumOfDeletedKeys {
|
||||
numOfKeysToDeleteForFileSizeCondition -= totalNumOfDeletedKeys
|
||||
} else {
|
||||
// Deleted number of keys is greater or equal. We have reached the required deletion.
|
||||
numOfKeysToDeleteForFileSizeCondition = 0
|
||||
}
|
||||
|
||||
glog.Infof("Remaining number of keys to delete: %v ", numOfKeysToDeleteForFileSizeCondition)
|
||||
}
|
||||
|
||||
elapsed := time.Since(beforeGCTime)
|
||||
glog.Infof("Deletion of prefixes took %v and removed %d keys with error: %v", elapsed, totalNumOfDeletedKeys, err)
|
||||
|
||||
beforeDropPrefix := time.Now()
|
||||
|
||||
// dropping prefix to force compression
|
||||
err = tables.Db().DropPrefix([]byte{})
|
||||
glog.Infof("Drop prefix took %v with error: %v", time.Since(beforeDropPrefix), err)
|
||||
return anyCleanupPerformed, totalNumOfDeletedKeys, totalNumOfKeysToDelete, nil
|
||||
}
|
||||
|
||||
func deletePartition(minPartition string, tables typed.Tables, deletionBatchSize int) (int, int, []string) {
|
||||
totalNumOfDeletedKeysforPrefix := 0
|
||||
totalNumOfKeysToDeleteForPrefix := 0
|
||||
numOfDeletedKeysforPrefix := 0
|
||||
numOfKeysToDeleteForPrefix := 0
|
||||
|
||||
partStart, partEnd, err := untyped.GetTimeRangeForPartition(minPartition)
|
||||
glog.Infof("GC removing partition %q with data from %v to %v (err %v)", minPartition, partStart, partEnd, err)
|
||||
var errMessages []string
|
||||
for _, tableName := range tables.GetTableNames() {
|
||||
prefix := fmt.Sprintf("/%s/%s", tableName, minPartition)
|
||||
start := time.Now()
|
||||
err, numOfDeletedKeysforPrefix, numOfKeysToDeleteForPrefix = common.DeleteKeysWithPrefix([]byte(prefix), tables.Db(), deletionBatchSize)
|
||||
metricGcDeletedNumberOfKeysByTable.WithLabelValues(fmt.Sprintf("%v", tableName)).Set(float64(numOfDeletedKeysforPrefix))
|
||||
elapsed := time.Since(start)
|
||||
glog.Infof("Call to DeleteKeysWithPrefix(%v) took %v and removed %d keys with error: %v", prefix, elapsed, numOfDeletedKeysforPrefix, err)
|
||||
if err != nil {
|
||||
errMessages = append(errMessages, fmt.Sprintf("failed to cleanup with min key: %s, elapsed: %v,err: %v,", prefix, elapsed, err))
|
||||
}
|
||||
|
||||
totalNumOfDeletedKeysforPrefix += numOfDeletedKeysforPrefix
|
||||
totalNumOfKeysToDeleteForPrefix += numOfKeysToDeleteForPrefix
|
||||
}
|
||||
|
||||
return totalNumOfDeletedKeysforPrefix, totalNumOfKeysToDeleteForPrefix, errMessages
|
||||
}
|
||||
|
||||
func cleanUpTimeCondition(minPartition string, maxPartition string, timeLimit time.Duration) bool {
|
||||
oldestTime, _, err := untyped.GetTimeRangeForPartition(minPartition)
|
||||
if err != nil {
|
||||
|
@ -244,11 +303,32 @@ func cleanUpTimeCondition(minPartition string, maxPartition string, timeLimit ti
|
|||
return false
|
||||
}
|
||||
|
||||
func cleanUpFileSizeCondition(stats *storeStats, sizeLimitBytes int) bool {
|
||||
if stats.DiskSizeBytes > int64(sizeLimitBytes) {
|
||||
glog.Infof("Start cleaning up because current file size: %v exceeds file size: %v", stats.DiskSizeBytes, sizeLimitBytes)
|
||||
return true
|
||||
func cleanUpFileSizeCondition(stats *storeStats, sizeLimitBytes int, gcThreshold float64) (bool, float64) {
|
||||
|
||||
// gcthreshold is the threshold when reached would trigger the garbage collection. Its because we want to proactively start GC when the size limit is about to hit.
|
||||
sizeThreshold := gcThreshold * float64(sizeLimitBytes)
|
||||
currentDiskSize := float64(stats.DiskSizeBytes)
|
||||
if currentDiskSize > sizeThreshold {
|
||||
glog.Infof("Start cleaning up because current file size: %v exceeds file size threshold: %v", stats.DiskSizeBytes, sizeThreshold)
|
||||
|
||||
garbageCollectionRatio := (currentDiskSize - sizeThreshold) / currentDiskSize
|
||||
return true, garbageCollectionRatio
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Can not clean up, disk size: %v is not exceeding size limit: %v yet", stats.DiskSizeBytes, uint64(sizeLimitBytes))
|
||||
return false
|
||||
return false, 0.0
|
||||
}
|
||||
|
||||
func getNumberOfKeysToDelete(db badgerwrap.DB, garbageCollectionRatio float64) int64 {
|
||||
totalKeyCount := float64(common.GetTotalKeyCount(db))
|
||||
metricTotalNumberOfKeys.Set(totalKeyCount)
|
||||
|
||||
if garbageCollectionRatio <= 0 || garbageCollectionRatio > 1 {
|
||||
// print float here and below
|
||||
glog.V(2).Infof("Garbage collection ratio out of bounds. Unexpected ratio: %f", garbageCollectionRatio)
|
||||
return 0
|
||||
}
|
||||
|
||||
keysToDelete := garbageCollectionRatio * totalKeyCount
|
||||
return int64(math.Ceil(keysToDelete))
|
||||
}
|
||||
|
|
|
@ -37,8 +37,9 @@ func Test_cleanUpFileSizeCondition_True(t *testing.T) {
|
|||
DiskSizeBytes: 10,
|
||||
}
|
||||
|
||||
flag := cleanUpFileSizeCondition(stats, 3)
|
||||
flag, ratio := cleanUpFileSizeCondition(stats, 5, 1)
|
||||
assert.True(t, flag)
|
||||
assert.Equal(t, 0.5, ratio)
|
||||
}
|
||||
|
||||
func Test_cleanUpFileSizeCondition_False(t *testing.T) {
|
||||
|
@ -46,8 +47,9 @@ func Test_cleanUpFileSizeCondition_False(t *testing.T) {
|
|||
DiskSizeBytes: 10,
|
||||
}
|
||||
|
||||
flag := cleanUpFileSizeCondition(stats, 100)
|
||||
flag, ratio := cleanUpFileSizeCondition(stats, 100, 0.8)
|
||||
assert.False(t, flag)
|
||||
assert.Equal(t, 0.0, ratio)
|
||||
}
|
||||
|
||||
func Test_cleanUpTimeCondition(t *testing.T) {
|
||||
|
@ -122,7 +124,7 @@ func Test_doCleanup_true(t *testing.T) {
|
|||
DiskSizeBytes: 10,
|
||||
}
|
||||
|
||||
flag, _, _, err := doCleanup(tables, time.Hour, 2, stats, 10)
|
||||
flag, _, _, err := doCleanup(tables, time.Hour, 2, stats, 10, 1)
|
||||
assert.True(t, flag)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
@ -135,7 +137,25 @@ func Test_doCleanup_false(t *testing.T) {
|
|||
DiskSizeBytes: 10,
|
||||
}
|
||||
|
||||
flag, _, _, err := doCleanup(tables, time.Hour, 1000, stats, 10)
|
||||
flag, _, _, err := doCleanup(tables, time.Hour, 1000, stats, 10, 1)
|
||||
assert.False(t, flag)
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func Test_getNumberOfKeysToDelete_Success(t *testing.T) {
|
||||
db := help_get_db(t)
|
||||
keysToDelete := getNumberOfKeysToDelete(db, 0.5)
|
||||
assert.Equal(t, int64(2), keysToDelete)
|
||||
}
|
||||
|
||||
func Test_getNumberOfKeysToDelete_Failure(t *testing.T) {
|
||||
db := help_get_db(t)
|
||||
keysToDelete := getNumberOfKeysToDelete(db, 0)
|
||||
assert.Equal(t, int64(0), keysToDelete)
|
||||
}
|
||||
|
||||
func Test_getNumberOfKeysToDelete_TestCeiling(t *testing.T) {
|
||||
db := help_get_db(t)
|
||||
keysToDelete := getNumberOfKeysToDelete(db, 0.33)
|
||||
assert.Equal(t, int64(2), keysToDelete)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче