Drop prefix optimization: Replaced DropPrefix with deletion of keys. Also added some more metrics. (#105)

* GC Fix

* Addressed all comments and added unit test

Fixed the replay use case and added unit test

* Removed the GetMin and GetMaxPartitions functions as they are not required now

* Added more unit test cases

* Removed dropPrefix to fix the performance

* Added some more metrics

* Added unit tests and some more metrics

* Removed dropPrefix to fix the performance

* Added some more metrics

* Added unit tests and some more metrics

* Incorporated some comments

* Incorporated all the comments

* fixed the minor comment
This commit is contained in:
sana-jawad 2020-02-12 17:07:00 -08:00 коммит произвёл GitHub
Родитель d6f04a9f7c
Коммит e0a8d06841
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 241 добавлений и 34 удалений

Просмотреть файл

@ -117,10 +117,10 @@ func Test_EventCountTable_Event_Truncated_Before_TruncateTS(t *testing.T) {
assert.Nil(t, err)
tables := typed.NewTableList(db)
evenTSStartTime := "2019-08-29T21:24:55Z"
evenTSEndTime := "2019-08-29T21:27:55Z"
eventStartTime := "2019-08-29T21:24:55Z"
eventEndTime := "2019-08-29T21:27:55Z"
// adding the first event which ends up adding the first partition
addEventCount(t, tables, someEventWatchPTime, evenTSStartTime, evenTSEndTime)
addEventCount(t, tables, someEventWatchPTime, eventStartTime, eventEndTime)
foundKeys, err := findEventKeys(tables, 6)
assert.Nil(t, err)
@ -163,7 +163,6 @@ func Test_EventCountTable_Events_Added_After_TruncateTS(t *testing.T) {
// adding the first event which ends up adding the first partition
addEventCount(t, tables, eventPTime28August, timestamp28AugustStartTime, timestamp28AugustEndTime)
timestamp29AugustStartTime := "2019-08-29T21:24:55Z"
timestamp29AugustEndTime := "2019-08-29T23:27:55Z"
eventPTime29August, _ := ptypes.TimestampProto(time.Date(2019, 8, 29, 21, 24, 55, 6, time.UTC))
@ -171,7 +170,6 @@ func Test_EventCountTable_Events_Added_After_TruncateTS(t *testing.T) {
// adding the second event which ends up adding the second partition
addEventCount(t, tables, eventPTime29August, timestamp29AugustStartTime, timestamp29AugustEndTime)
timestamp30AugustStartTime := "2019-08-30T21:21:55Z"
timestamp30AugustEndTime := "2019-08-30T21:23:55Z"
eventPTime30August, _ := ptypes.TimestampProto(time.Date(2019, 8, 30, 21, 24, 55, 6, time.UTC))

Просмотреть файл

@ -39,6 +39,7 @@ type SloopConfig struct {
MaxDiskMb int `json:"maxDiskMb"`
DebugPlaybackFile string `json:"debugPlaybackFile"`
DebugRecordFile string `json:"debugRecordFile"`
DeletionBatchSize int `json:"deletionBatchSize"`
UseMockBadger bool `json:"mockBadger"`
DisableStoreManager bool `json:"disableStoreManager"`
CleanupFrequency time.Duration `json:"cleanupFrequency" validate:"min=1h,max=120h"`
@ -80,6 +81,7 @@ func registerFlags(fs *flag.FlagSet, config *SloopConfig) {
fs.StringVar(&config.DefaultLookback, "default-lookback", "1h", "Default UX filter lookback")
fs.StringVar(&config.DefaultKind, "default-kind", "_all", "Default UX filter kind")
fs.StringVar(&config.DefaultNamespace, "default-namespace", "default", "Default UX filter namespace")
fs.IntVar(&config.DeletionBatchSize, "deletion-batch-size", 1000, "Size of batch for deletion")
fs.StringVar(&config.UseKubeContext, "context", "", "Use a specific kubernetes context")
fs.StringVar(&config.DisplayContext, "display-context", "", "Use this to override the display context. When running in k8s the context is empty string. This lets you override that (mainly useful if you are running many copies of sloop on different clusters) ")
fs.StringVar(&config.ApiServerHost, "apiserver-host", "", "Kubernetes API server endpoint")

Просмотреть файл

@ -124,6 +124,7 @@ func RealMain() error {
SizeLimitBytes: conf.MaxDiskMb * 1024 * 1024,
BadgerDiscardRatio: conf.BadgerDiscardRatio,
BadgerVLogGCFreq: conf.BadgerVLogGCFreq,
DeletionBatchSize: conf.DeletionBatchSize,
}
storemgr = storemanager.NewStoreManager(tables, storeCfg, fs)
storemgr.Start()

Просмотреть файл

@ -72,7 +72,7 @@ type Item interface {
// EstimatedSize() int64
// ExpiresAt() uint64
// IsDeletedOrExpired() bool
// KeyCopy(dst []byte) []byte
KeyCopy(dst []byte) []byte
// KeySize() int64
// String() string
// UserMeta() byte

Просмотреть файл

@ -124,6 +124,10 @@ func (i *BadgerItem) ValueCopy(dst []byte) ([]byte, error) {
return i.item.ValueCopy(dst)
}
func (i *BadgerItem) KeyCopy(dst []byte) []byte {
return i.item.KeyCopy(dst)
}
// Iterator
func (i *BadgerIterator) Close() {

Просмотреть файл

@ -179,6 +179,13 @@ func (i *MockItem) ValueCopy(dst []byte) ([]byte, error) {
return newcopy, nil
}
func (i *MockItem) KeyCopy(dst []byte) []byte {
copy(dst, i.key)
newcopy := make([]byte, len(i.key))
copy(newcopy, i.key)
return newcopy
}
// Iterator
func (i *MockIterator) Close() {

Просмотреть файл

@ -0,0 +1,74 @@
package untyped
import (
"github.com/dgraph-io/badger/v2"
"github.com/salesforce/sloop/pkg/sloop/store/untyped/badgerwrap"
)
func deleteKeys(db badgerwrap.DB, keysForDelete [][]byte) (error, int) {
deletedKeysInThisBatch := 0
err := db.Update(func(txn badgerwrap.Txn) error {
for _, key := range keysForDelete {
err := txn.Delete(key)
if err != nil {
return err
}
deletedKeysInThisBatch++
}
return nil
})
if err != nil {
return err, deletedKeysInThisBatch
}
return nil, deletedKeysInThisBatch
}
func DeleteKeysWithPrefix(keyPrefix []byte, db badgerwrap.DB, deletionBatchSize int) (error, float64, float64) {
numOfKeysToDelete := 0
numOfKeysDeleted := 0
keysLeftToDelete := true
for keysLeftToDelete {
keysThisBatch := make([][]byte, 0, deletionBatchSize)
// getting the keys to delete that have the given prefix
_ = db.View(func(txn badgerwrap.Txn) error {
iterOpt := badger.DefaultIteratorOptions
iterOpt.Prefix = keyPrefix
iterOpt.AllVersions = false
iterOpt.PrefetchValues = false
it := txn.NewIterator(iterOpt)
defer it.Close()
for it.Seek(keyPrefix); it.ValidForPrefix(keyPrefix); it.Next() {
keyToDel := it.Item().KeyCopy(nil)
keysThisBatch = append(keysThisBatch, keyToDel)
if len(keysThisBatch) == deletionBatchSize {
break
}
}
return nil
})
// deleting the keys in batch
if len(keysThisBatch) > 0 {
err, deletedKeysInThisBatch := deleteKeys(db, keysThisBatch)
numOfKeysToDelete += len(keysThisBatch)
numOfKeysDeleted += deletedKeysInThisBatch
if err != nil {
return err, float64(numOfKeysDeleted), float64(numOfKeysToDelete)
}
}
if len(keysThisBatch) < deletionBatchSize {
keysLeftToDelete = false
}
}
return nil, float64(numOfKeysDeleted), float64(numOfKeysToDelete)
}

Просмотреть файл

@ -0,0 +1,69 @@
package untyped
import (
"github.com/dgraph-io/badger/v2"
"github.com/salesforce/sloop/pkg/sloop/store/untyped/badgerwrap"
"github.com/stretchr/testify/assert"
"testing"
)
var commonPrefix = "/commonprefix/001546405200/"
func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteAllKeys(t *testing.T) {
db := helper_get_db(t)
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix(commonPrefix))
err, numOfDeletedKeys, numOfKeysToDelete := DeleteKeysWithPrefix([]byte(commonPrefix), db, 10)
assert.Nil(t, err)
assert.Equal(t, float64(4), numOfDeletedKeys)
assert.Equal(t, float64(4), numOfKeysToDelete)
}
func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteNoKeys(t *testing.T) {
db := helper_get_db(t)
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix(commonPrefix))
err, numOfDeletedKeys, numOfKeysToDelete := DeleteKeysWithPrefix([]byte(commonPrefix+"random"), db, 10)
assert.Nil(t, err)
assert.Equal(t, float64(0), numOfDeletedKeys)
assert.Equal(t, float64(0), numOfKeysToDelete)
}
func Test_Db_Utilities_DeleteKeysWithPrefix_DeleteSomeKeys(t *testing.T) {
db := helper_get_db(t)
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix(commonPrefix))
helper_add_keys_to_db(t, db, helper_testKeys_with_common_prefix("randomStuff"+commonPrefix))
err, numOfDeletedKeys, numOfKeysToDelete := DeleteKeysWithPrefix([]byte(commonPrefix), db, 10)
assert.Nil(t, err)
assert.Equal(t, float64(4), numOfDeletedKeys)
assert.Equal(t, float64(4), numOfKeysToDelete)
}
func helper_get_db(t *testing.T) badgerwrap.DB {
db, err := (&badgerwrap.MockFactory{}).Open(badger.DefaultOptions(""))
assert.Nil(t, err)
return db
}
func helper_add_keys_to_db(t *testing.T, db badgerwrap.DB, keys []string) badgerwrap.DB {
err := db.Update(func(txn badgerwrap.Txn) error {
var txerr error
for _, key := range keys {
txerr = txn.Set([]byte(key), []byte{})
if txerr != nil {
return txerr
}
}
return nil
})
assert.Nil(t, err)
return db
}
func helper_testKeys_with_common_prefix(prefix string) []string {
return []string{
// someMaxTs partition
prefix + "Pod/user-j/sync-123/sam-partition-testdata",
prefix + "Pod/user-j/sync-123/sam-partition-test",
prefix + "Pod/user-t/sync-123/sam-partition-testdata",
prefix + "Pod/user-w/sync-123/sam-partition-test",
}
}

Просмотреть файл

@ -60,9 +60,18 @@ func GetTimeRangeForPartition(partitionId string) (time.Time, time.Time, error)
return oldestTime, newestTime, nil
}
func TestHookSetPartitionDuration(partDuration time.Duration) bool {
func GetAgeOfPartitionInHours(partitionId string) (float64, error) {
timeForPartition, err := GetTimeForPartition(partitionId)
if err != nil {
return -1, err
}
nanosecondsInAnHour := time.Duration(60 * 60 * 1000000000)
return float64(time.Now().Sub(timeForPartition) / nanosecondsInAnHour), nil
}
func TestHookSetPartitionDuration(partDuration time.Duration) {
partitionDuration = partDuration
return true
}
func GetPartitionDuration() time.Duration {

Просмотреть файл

@ -119,8 +119,6 @@ func emitGCMetrics(stats *storeStats) {
func getDeltaStats(beforeStats *storeStats, afterStats *storeStats) *storeStats {
ret := &storeStats{}
ret.LevelToKeyCount = make(map[int]uint64)
ret.LevelToTableCount = make(map[int]int)
for k, v := range beforeStats.LevelToKeyCount {
metricCleanedBadgerKeys.WithLabelValues(fmt.Sprintf("%v", k)).Set(float64(v) - float64(afterStats.LevelToKeyCount[k]))

Просмотреть файл

@ -20,14 +20,20 @@ import (
)
var (
metricGcRunCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_gc_run_count"})
metricGcSuccessCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_gc_success_count"})
metricGcFailedCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_gc_failed_count"})
metricGcLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_latency_sec"})
metricGcRunning = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_running"})
metricValueLogGcRunCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_valueloggc_run_count"})
metricValueLogGcLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_valueloggc_latency_sec"})
metricValueLogGcRunning = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_valueloggc_running"})
metricGcRunCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_gc_run_count"})
metricGcSuccessCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_gc_success_count"})
metricGcFailedCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_gc_failed_count"})
metricGcLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_latency_sec"})
metricGcRunning = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_running"})
metricGcCleanUpPerformed = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_cleanup_performed"})
metricGcDeletedNumberOfKeys = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_deleted_num_of_keys"})
metricGcNumberOfKeysToDelete = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_num_of_keys_to_delete"})
metricGcDeletedNumberOfKeysByTable = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "sloop_deleted_keys_by_table"}, []string{"table"})
metricAgeOfMinimumPartition = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_age_of_minimum_partition_hr"})
metricAgeOfMaximumPartition = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_gc_age_of_maximum_partition_hr"})
metricValueLogGcRunCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_valueLoggc_run_count"})
metricValueLogGcLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_valueLoggc_latency_sec"})
metricValueLogGcRunning = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_valueLoggc_running"})
)
type Config struct {
@ -37,6 +43,7 @@ type Config struct {
SizeLimitBytes int
BadgerDiscardRatio float64
BadgerVLogGCFreq time.Duration
DeletionBatchSize int
}
type StoreManager struct {
@ -89,7 +96,10 @@ func (sm *StoreManager) gcLoop() {
metricGcRunCount.Inc()
before := time.Now()
metricGcRunning.Set(1)
_, err := doCleanup(sm.tables, sm.config.TimeLimit, sm.config.SizeLimitBytes, sm.stats)
cleanUpPerformed, numOfDeletedKeys, numOfKeysToDelete, err := doCleanup(sm.tables, sm.config.TimeLimit, sm.config.SizeLimitBytes, sm.stats, sm.config.DeletionBatchSize)
metricGcCleanUpPerformed.Set(boolToFloat(cleanUpPerformed))
metricGcDeletedNumberOfKeys.Set(numOfDeletedKeys)
metricGcNumberOfKeysToDelete.Set(numOfKeysToDelete)
metricGcRunning.Set(0)
if err == nil {
metricGcSuccessCount.Inc()
@ -97,7 +107,7 @@ func (sm *StoreManager) gcLoop() {
metricGcFailedCount.Inc()
}
metricGcLatency.Set(time.Since(before).Seconds())
glog.Infof("GC finished in %v with return %q. Next run in %v", time.Since(before), err, sm.config.Freq)
glog.Infof("GC finished in %v with error '%v'. Next run in %v", time.Since(before), err, sm.config.Freq)
var afterGCEnds = sm.refreshStats()
var deltaStats = getDeltaStats(beforeGCStats, afterGCEnds)
@ -158,42 +168,58 @@ func (sm *StoreManager) refreshStats() *storeStats {
return sm.stats
}
func doCleanup(tables typed.Tables, timeLimit time.Duration, sizeLimitBytes int, stats *storeStats) (bool, error) {
func doCleanup(tables typed.Tables, timeLimit time.Duration, sizeLimitBytes int, stats *storeStats, deletionBatchSize int) (bool, float64, float64, error) {
ok, minPartition, maxPartition, err := tables.GetMinAndMaxPartition()
if err != nil {
return false, fmt.Errorf("failed to get min partition : %s, max partition: %s, err:%v", minPartition, maxPartition, err)
return false, 0, 0, fmt.Errorf("failed to get min partition : %s, max partition: %s, err:%v", minPartition, maxPartition, err)
}
if !ok {
return false, nil
return false, 0, 0, nil
}
var totalNumOfDeletedKeys float64 = 0
var totalNumOfKeysToDelete float64 = 0
anyCleanupPerformed := false
if cleanUpTimeCondition(minPartition, maxPartition, timeLimit) || cleanUpFileSizeCondition(stats, sizeLimitBytes) {
partStart, partEnd, err := untyped.GetTimeRangeForPartition(minPartition)
glog.Infof("GC removing partition %q with data from %v to %v (err %v)", minPartition, partStart, partEnd, err)
var errMsgs []string
minPartitionAge := 0.0
minPartitionAge, err = untyped.GetAgeOfPartitionInHours(minPartition)
if err != nil {
metricAgeOfMinimumPartition.Set(minPartitionAge)
}
maxPartitionAge, err := untyped.GetAgeOfPartitionInHours(maxPartition)
if err != nil {
metricAgeOfMaximumPartition.Set(maxPartitionAge)
}
var errMessages []string
for _, tableName := range tables.GetTableNames() {
prefix := fmt.Sprintf("/%s/%s", tableName, minPartition)
start := time.Now()
err = tables.Db().DropPrefix([]byte(prefix))
err, numOfDeletedKeys, numOfKeysToDelete := untyped.DeleteKeysWithPrefix([]byte(prefix), tables.Db(), deletionBatchSize)
metricGcDeletedNumberOfKeysByTable.WithLabelValues(fmt.Sprintf("%v", tableName)).Set(numOfDeletedKeys)
totalNumOfDeletedKeys += numOfDeletedKeys
totalNumOfKeysToDelete += numOfKeysToDelete
elapsed := time.Since(start)
glog.Infof("Call to DropPrefix(%v) took %v and returned %v", prefix, elapsed, err)
glog.Infof("Call to DeleteKeysWithPrefix(%v) took %v and removed %f keys with error: %v", prefix, elapsed, numOfDeletedKeys, err)
if err != nil {
errMsgs = append(errMsgs, fmt.Sprintf("failed to cleanup with min key: %s, elapsed: %v,err: %v,", prefix, elapsed, err))
errMessages = append(errMessages, fmt.Sprintf("failed to cleanup with min key: %s, elapsed: %v,err: %v,", prefix, elapsed, err))
}
anyCleanupPerformed = true
}
if len(errMsgs) != 0 {
if len(errMessages) != 0 {
var errMsg string
for _, er := range errMsgs {
for _, er := range errMessages {
errMsg += er + ","
}
return false, fmt.Errorf(errMsg)
return false, totalNumOfDeletedKeys, totalNumOfKeysToDelete, fmt.Errorf(errMsg)
}
}
return anyCleanupPerformed, nil
return anyCleanupPerformed, totalNumOfDeletedKeys, totalNumOfKeysToDelete, nil
}
func cleanUpTimeCondition(minPartition string, maxPartition string, timeLimit time.Duration) bool {

Просмотреть файл

@ -122,7 +122,7 @@ func Test_doCleanup_true(t *testing.T) {
DiskSizeBytes: 10,
}
flag, err := doCleanup(tables, time.Hour, 2, stats)
flag, _, _, err := doCleanup(tables, time.Hour, 2, stats, 10)
assert.True(t, flag)
assert.Nil(t, err)
}
@ -135,7 +135,7 @@ func Test_doCleanup_false(t *testing.T) {
DiskSizeBytes: 10,
}
flag, err := doCleanup(tables, time.Hour, 1000, stats)
flag, _, _, err := doCleanup(tables, time.Hour, 1000, stats, 10)
assert.False(t, flag)
assert.Nil(t, err)
}

Просмотреть файл

@ -0,0 +1,8 @@
package storemanager
func boolToFloat(value bool) float64 {
if value {
return 1
}
return 0
}

Просмотреть файл

@ -0,0 +1,11 @@
package storemanager
import (
"github.com/stretchr/testify/assert"
"testing"
)
func Test_boolToFloat(t *testing.T) {
assert.Equal(t, float64(1), boolToFloat(true))
assert.Equal(t, float64(0), boolToFloat(false))
}

Просмотреть файл

@ -62,7 +62,7 @@ type WebConfig struct {
var (
metricWebServerRequestCount = promauto.NewCounter(prometheus.CounterOpts{Name: "sloop_webserver_request_count"})
metricWebServerRequestLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_webserver_request_latency"})
metricWebServerRequestLatency = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_webserver_request_latency_sec"})
)
// This is not going to change and we don't want to pass it to every function