Merged PR 94: Divide txPreplay lock

This commit is contained in:
Runhuai Li 2020-06-22 07:08:05 +00:00 коммит произвёл Yang Chen (MSR)
Родитель 8e9bbdc4e4
Коммит 93e5efbea5
24 изменённых файлов: 802 добавлений и 533 удалений

Просмотреть файл

@ -226,7 +226,7 @@ var (
utils.GenerateEmulatorLogFromFlag,
utils.GenerateEmulatorLogToFlag,
utils.ParallelizeReuseFlag,
utils.CalWarmupMissFlag,
utils.WarmupMissDetailFlag,
utils.ReportMissDetailFlag,
utils.ReuseTracerCheckFlag,
alliedNodeFileFlag,

Просмотреть файл

@ -907,9 +907,9 @@ var (
Name: "parallelizereuse",
Usage: "Parallelize reuse and real apply when apply (enable only when reuse mode)",
}
CalWarmupMissFlag = cli.BoolFlag{
Name: "calwarmupmiss",
Usage: "Calculate warmup miss count for address and key",
WarmupMissDetailFlag = cli.BoolFlag{
Name: "warmupmissdetail",
Usage: "Classify warmup miss of address and key by detail",
}
ReportMissDetailFlag = cli.BoolFlag{
Name: "reportmissdetail",
@ -1779,7 +1779,7 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
EmulateFile: "my.json",
ParallelizeReuse: ctx.GlobalBool(ParallelizeReuseFlag.Name),
CalWarmupMiss: ctx.GlobalBool(CalWarmupMissFlag.Name),
WarmupMissDetail: ctx.GlobalBool(WarmupMissDetailFlag.Name),
ReportMissDetail: ctx.GlobalBool(ReportMissDetailFlag.Name),
}
if ctx.GlobalIsSet(CmpReuseLogDirFlag.Name) {

Просмотреть файл

@ -45,8 +45,7 @@ func (reuse *Cmpreuse) setAllResult(reuseStatus *cmptypes.ReuseStatus, curRoundI
if txPreplay == nil {
txPreplay = reuse.addNewTx(tx, rwrecord)
}
txPreplay.Mu.Lock()
defer txPreplay.Mu.Unlock()
start := time.Now()
// Generally, there are three scenarios : 1. NoHit 2. DepHit 3. DetailHit (Hit but not DepHit)
@ -70,35 +69,51 @@ func (reuse *Cmpreuse) setAllResult(reuseStatus *cmptypes.ReuseStatus, curRoundI
}
var err error
if txPreplay.PreplayResults.IsExternalTransfer && !cache.IsExternalTransfer(rwrecord.ReadDetail.ReadDetailSeq){
txPreplay.PreplayResults.MixTreeMu.Lock()
if txPreplay.PreplayResults.IsExternalTransfer && !cache.IsExternalTransfer(rwrecord.ReadDetail.ReadDetailSeq) {
// if isExternalTransfer is changed from true to false, only set dep in mix tree:
err = reuse.setMixTreeWithOnlyDependence(txPreplay, round)
}else{
} else {
err = reuse.setMixTree(tx, txPreplay, round)
}
txPreplay.PreplayResults.MixTreeMu.Unlock()
if err != nil {
roundjs, _ := json.Marshal(round)
log.Error("setMixTree error", "round", string(roundjs))
panic(err.Error())
}
if reuseStatus.BaseStatus != cmptypes.Hit {
txPreplay.PreplayResults.RWRecordTrieMu.Lock()
err = reuse.setRWRecordTrie(txPreplay, round, curBlockNumber)
txPreplay.PreplayResults.RWRecordTrieMu.Unlock()
if err != nil {
roundjs, _ := json.Marshal(round)
log.Error("setRWRecordTrie error", "round", string(roundjs))
panic(err.Error())
}
if txPreplay.PreplayResults.IsExternalTransfer {
txPreplay.PreplayResults.DeltaTreeMu.Lock()
err = reuse.setDeltaTree(tx, txPreplay, round, curBlockNumber)
txPreplay.PreplayResults.DeltaTreeMu.Unlock()
if err != nil {
roundjs, _ := json.Marshal(round)
log.Error("setDeltaTree error", "round", string(roundjs))
panic(err.Error())
}
}
if trace != nil {
traceTrieStart := time.Now()
txPreplay.PreplayResults.TraceTrieMu.Lock()
reuse.setTraceTrie(tx, txPreplay, round, trace)
txPreplay.PreplayResults.TraceTrieMu.Unlock()
cost := time.Since(traceTrieStart)
if cost > 14*time.Second {
log.Warn("Slow setTraceTrie", "txHash", txHash.Hex(), "Seconds", cost,
@ -122,7 +137,6 @@ func (reuse *Cmpreuse) setRWRecordTrie(txPreplay *cache.TxPreplay, round *cache.
return InsertRecord(trie, round, curBlockNumber)
}
func (reuse *Cmpreuse) setMixTreeWithOnlyDependence(txPreplay *cache.TxPreplay, round *cache.PreplayResult) error {
return InsertAccDep(txPreplay.PreplayResults.MixTree, round)
}

Просмотреть файл

@ -217,9 +217,9 @@ type MixHitType int
const (
AllDepHit MixHitType = iota
AllDetailHit
PartialHit
PartialHit // partial dep and partial detail hit
AllDeltaHit
PartialDeltaHit
PartialDeltaHit // partial dep and partial delta hit
NotMixHit
)
@ -231,6 +231,10 @@ func (m MixHitType) String() string {
return "AllDetail"
case PartialHit:
return "Partial"
case AllDeltaHit:
return "AllDelta"
case PartialDeltaHit:
return "PartialDelta"
case NotMixHit:
return "NotMix"
}
@ -240,7 +244,7 @@ func (m MixHitType) String() string {
type MissType int
const (
NoInMiss MissType = iota
TraceMiss MissType = iota
NoMatchMiss
)

Просмотреть файл

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/optipreplayer/cache"
"github.com/subchen/go-trylock/v2"
"math/big"
"time"
)
@ -389,12 +390,9 @@ func MixApplyObjState(statedb *state.StateDB, rw *cache.RWRecord, wobjectRefs ca
}
}
func (reuse *Cmpreuse) mixCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, statedb *state.StateDB,
header *types.Header, blockPre *cache.BlockPre, abort func() bool, isBlockProcess bool, cmpReuseChecking bool) (
*cache.PreplayResult, *cmptypes.MixHitStatus, *cmptypes.PreplayResTrieNode, interface{}, bool, bool) {
//txPreplay.Mu.Lock()
//defer txPreplay.Mu.Unlock()
func (reuse *Cmpreuse) mixCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, statedb *state.StateDB, header *types.Header, abort func() bool,
blockPre *cache.BlockPre, isBlockProcess bool, cmpReuseChecking bool) (*cache.PreplayResult, *cmptypes.MixHitStatus, *cmptypes.PreplayResTrieNode,
interface{}, bool, bool) {
trie := txPreplay.PreplayResults.MixTree
res, mixHitStatus, missNode, missValue, isAbort, ok := SearchMixTree(trie, statedb, bc, header, abort, false, isBlockProcess, txPreplay.PreplayResults.IsExternalTransfer)
if ok {
@ -471,9 +469,9 @@ func (reuse *Cmpreuse) mixCheck(txPreplay *cache.TxPreplay, bc core.ChainContext
}
}
func (reuse *Cmpreuse) depCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, statedb *state.StateDB,
header *types.Header, blockPre *cache.BlockPre, abort func() bool, isBlockProcess bool, cfg *vm.Config) (
res *cache.PreplayResult, isAbort bool, ok bool) {
// Deprecated
func (reuse *Cmpreuse) depCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, statedb *state.StateDB, header *types.Header, abort func() bool,
blockPre *cache.BlockPre, isBlockProcess bool, cfg *vm.Config) (res *cache.PreplayResult, isAbort bool, ok bool) {
//txPreplay.Mu.Lock()
//defer txPreplay.Mu.Unlock()
@ -513,8 +511,8 @@ func (reuse *Cmpreuse) depCheck(txPreplay *cache.TxPreplay, bc core.ChainContext
log.Info("show depcheck match read dep", "readaddress", addr, "lastTxhash", preTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID)
preTxPreplay := reuse.MSRACache.PeekTxPreplay(*preTxResId.Txhash)
if preTxPreplay != nil {
preTxRound, okk := preTxPreplay.GetRound(preTxResId.RoundID)
if okk {
preTxPreplay.RLockRound()
if preTxRound, okk := preTxPreplay.PeekRound(preTxResId.RoundID); okk {
pretxbs, _ := json.Marshal(preTxRound)
//if jerr != nil {
log.Info("@@@@@@ pre tx info", "pretxRound", string(pretxbs))
@ -522,6 +520,7 @@ func (reuse *Cmpreuse) depCheck(txPreplay *cache.TxPreplay, bc core.ChainContext
} else {
log.Warn("no this tx round")
}
preTxPreplay.RUnlockRound()
} else {
log.Warn("no this tx preplay")
}
@ -585,7 +584,7 @@ func (reuse *Cmpreuse) depCheck(txPreplay *cache.TxPreplay, bc core.ChainContext
}
func (reuse *Cmpreuse) trieCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, statedb *state.StateDB, header *types.Header, abort func() bool,
blockPre *cache.BlockPre, isBlockProcess bool, cfg *vm.Config) (res *cache.PreplayResult, isAbort bool, ok bool) {
blockPre *cache.BlockPre, isBlockProcess bool, cfg *vm.Config) (*cache.PreplayResult, bool, bool) {
trie := txPreplay.PreplayResults.RWRecordTrie
trieNode, isAbort, ok := SearchTree(trie, statedb, bc, header, abort, false)
@ -621,8 +620,8 @@ func (reuse *Cmpreuse) trieCheck(txPreplay *cache.TxPreplay, bc core.ChainContex
log.Info("show depcheck match read dep", "readaddress", addr, "lastTxhash", preTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID)
preTxPreplay := reuse.MSRACache.PeekTxPreplay(*preTxResId.Txhash)
if preTxPreplay != nil {
preTxRound, okk := preTxPreplay.GetRound(preTxResId.RoundID)
if okk {
preTxPreplay.RLockRound()
if preTxRound, okk := preTxPreplay.PeekRound(preTxResId.RoundID); okk {
pretxbs, _ := json.Marshal(preTxRound)
//if jerr != nil {
log.Info("@@@@@@ pre tx info", "pretxRound", string(pretxbs))
@ -630,6 +629,7 @@ func (reuse *Cmpreuse) trieCheck(txPreplay *cache.TxPreplay, bc core.ChainContex
} else {
log.Warn("no this tx round")
}
preTxPreplay.RUnlockRound()
} else {
log.Warn("no this tx preplay")
}
@ -692,10 +692,10 @@ func (reuse *Cmpreuse) trieCheck(txPreplay *cache.TxPreplay, bc core.ChainContex
return nil, isAbort, false
}
func (reuse *Cmpreuse) deltaCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, db *state.StateDB, header *types.Header, abort func() bool,
func (reuse *Cmpreuse) deltaCheck(txPreplay *cache.TxPreplay, bc core.ChainContext, statedb *state.StateDB, header *types.Header, abort func() bool,
blockPre *cache.BlockPre) (res *cache.PreplayResult, isAbort bool, ok bool) {
trie := txPreplay.PreplayResults.DeltaTree
trieNode, isAbort, ok := SearchTree(trie, db, bc, header, abort, false)
trieNode, isAbort, ok := SearchTree(trie, statedb, bc, header, abort, false)
if ok {
res := trieNode.Round.(*cache.PreplayResult)
if blockPre != nil && blockPre.ListenTimeNano < res.TimestampNano {
@ -706,8 +706,8 @@ func (reuse *Cmpreuse) deltaCheck(txPreplay *cache.TxPreplay, bc core.ChainConte
return nil, isAbort, false
}
func (reuse *Cmpreuse) traceCheck(txPreplay *cache.TxPreplay, db *state.StateDB, header *types.Header, getHashFunc vm.GetHashFunc, precompiles map[common.Address]vm.PrecompiledContract, abort func() bool,
blockPre *cache.BlockPre, isBlockProcess bool, tracerCheck bool) *TraceTrieSearchResult {
func (reuse *Cmpreuse) traceCheck(txPreplay *cache.TxPreplay, statedb *state.StateDB, header *types.Header, abort func() bool, isBlockProcess bool,
getHashFunc vm.GetHashFunc, precompiles map[common.Address]vm.PrecompiledContract, tracerCheck bool) *TraceTrieSearchResult {
if !isBlockProcess {
return nil
} else {
@ -722,7 +722,7 @@ func (reuse *Cmpreuse) traceCheck(txPreplay *cache.TxPreplay, db *state.StateDB,
}
if trie != nil {
sr, _, _ := trie.SearchTraceTrie(db, header, getHashFunc, precompiles, abort, tracerCheck, reuse.MSRACache)
sr, _, _ := trie.SearchTraceTrie(statedb, header, getHashFunc, precompiles, abort, tracerCheck, reuse.MSRACache)
return sr
}
return nil
@ -857,54 +857,59 @@ func (reuse *Cmpreuse) reuseTransaction(bc core.ChainContext, author *common.Add
return
}
if isBlockProcess {
if !txPreplay.Mu.TryLockTimeout(0) {
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.TxPreplayLock} // return because txPreplay is locked
var lockCount int
var tryHoldLock = func(mu trylock.TryLocker) (hold bool) {
if isBlockProcess {
if !mu.RTryLockTimeout(0) {
lockCount++
return false
}
} else {
mu.RLock()
}
return true
}
if tryHoldLock(txPreplay.PreplayResults.MixTreeMu) {
round, mixStatus, missNode, missValue, isAbort, ok = reuse.mixCheck(txPreplay, bc, statedb, header, abort, blockPre, isBlockProcess, cfg.MSRAVMSettings.CmpReuseChecking)
txPreplay.PreplayResults.MixTreeMu.RUnlock()
if ok {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.MixHit, MixHitStatus: mixStatus}
} else if isAbort {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.MixCheck} // abort before hit or miss
return
}
} else {
txPreplay.Mu.Lock()
}
defer txPreplay.Mu.Unlock()
round, mixStatus, missNode, missValue, isAbort, ok = reuse.mixCheck(txPreplay, bc, statedb, header, blockPre, abort, isBlockProcess, cfg.MSRAVMSettings.CmpReuseChecking)
if ok {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.MixHit, MixHitStatus: mixStatus}
} else if isAbort {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.MixCheck} // abort before hit or miss
return
cache.LockCount[0] ++
}
if status == nil {
if isBlockProcess {
if status == nil {
if tryHoldLock(txPreplay.PreplayResults.TraceTrieMu) {
sr = reuse.traceCheck(txPreplay, statedb, header, abort, isBlockProcess, getHashFunc, precompiles, cfg.MSRAVMSettings.CmpReuseChecking)
txPreplay.PreplayResults.TraceTrieMu.RUnlock()
if isBlockProcess {
sr = reuse.traceCheck(txPreplay, statedb, header, getHashFunc, precompiles, abort, blockPre, isBlockProcess, cfg.MSRAVMSettings.CmpReuseChecking)
if sr != nil && sr.hit {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.TraceHit,
TraceHitStatus: sr.TraceHitStatus}
round = sr.GetAnyRound()
} else if sr != nil && sr.aborted {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.TraceCheck}
return
if sr != nil && sr.hit {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.TraceHit, TraceHitStatus: sr.TraceHitStatus}
round = sr.GetAnyRound()
} else if sr != nil && sr.aborted {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.TraceCheck}
return
} else if sr != nil {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Miss, MissType: cmptypes.TraceMiss, MissNode: missNode, MissValue: missValue}
return
}
} else {
cache.LockCount[1] ++
}
}
}
if status == nil {
if sr != nil {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Miss, MissType: cmptypes.NoMatchMiss,
MissNode: missNode, MissValue: missValue}
return
}
}
if status == nil {
// Think about this scenario
// 1. Tx1 is preplay in round 1
// 2. In round 2, tx1 is reused with delta in round 1
@ -914,128 +919,147 @@ func (reuse *Cmpreuse) reuseTransaction(bc core.ChainContext, author *common.Add
//
// I think NO, that would be useless
//
//The delta reuse should be banned when preplaying, because:
// The delta reuse should be banned when preplaying, because:
// 1. Reusing delta would reduce variety of dep relations and reduce the hit rate of dep match
// 2. Delta reuse will be still helpful for blockprocessing
if isBlockProcess {
round, isAbort, ok = reuse.deltaCheck(txPreplay, bc, statedb, header, abort, blockPre)
if status == nil {
if tryHoldLock(txPreplay.PreplayResults.DeltaTreeMu) {
round, isAbort, ok = reuse.deltaCheck(txPreplay, bc, statedb, header, abort, blockPre)
txPreplay.PreplayResults.DeltaTreeMu.RUnlock()
if ok {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.DeltaHit, MixHitStatus: mixStatus}
// debug info
//rss, _ := json.Marshal(status)
//log.Info("reuse delta", "txhash", tx.Hash().Hex(), "status", string(rss))
} else if isAbort {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.DeltaCheck} // abort before hit or miss
return
}
} else {
cache.LockCount[2] ++
}
}
}
if status == nil {
if tryHoldLock(txPreplay.PreplayResults.RWRecordTrieMu) {
round, isAbort, ok = reuse.trieCheck(txPreplay, bc, statedb, header, abort, blockPre, isBlockProcess, cfg)
txPreplay.PreplayResults.RWRecordTrieMu.RUnlock()
if ok {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.DeltaHit, MixHitStatus: mixStatus}
// debug info
//rss, _ := json.Marshal(status)
//log.Info("reuse delta", "txhash", tx.Hash().Hex(), "status", string(rss))
} else if isAbort {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.DeltaCheck} // abort before hit or miss
return
}
}
}
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.TrieHit, MixHitStatus: mixStatus}
// why trie hit instead of mixhit:
// over matching dep leads to a wrong way; the round found by trie might miss all deps
if false && isBlockProcess {
mixbs, _ := json.Marshal(mixStatus)
roundbs, _ := json.Marshal(round)
log.Warn("Mixhit miss !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "tx", txPreplay.TxHash.Hex(), "mixstatus", string(mixbs), "round", string(roundbs))
txPreplay.PreplayResults.MixTreeMu.RLock()
SearchMixTree(txPreplay.PreplayResults.MixTree, statedb, bc, header, func() bool { return false }, true, isBlockProcess, txPreplay.PreplayResults.IsExternalTransfer)
txPreplay.PreplayResults.MixTreeMu.RUnlock()
log.Warn(". . . . . . . . . . . . . . . . . . . . . . . . . ")
SearchTree(txPreplay.PreplayResults.RWRecordTrie, statedb, bc, header, func() bool { return false }, true)
formertxsbs, _ := json.Marshal(statedb.ProcessedTxs)
log.Warn("former tx", "former txs", string(formertxsbs))
if status == nil {
round, isAbort, ok = reuse.trieCheck(txPreplay, bc, statedb, header, abort, blockPre, isBlockProcess, cfg)
if ok {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Hit, HitType: cmptypes.TrieHit, MixHitStatus: mixStatus}
// why trie hit instead of mixhit:
// over matching dep leads to a wrong way; the round found by trie might miss all deps
if false && isBlockProcess {
mixbs, _ := json.Marshal(mixStatus)
roundbs, _ := json.Marshal(round)
log.Warn("Mixhit miss !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", "tx", txPreplay.TxHash.Hex(), "mixstatus", string(mixbs), "round", string(roundbs))
SearchMixTree(txPreplay.PreplayResults.MixTree, statedb, bc, header, func() bool { return false }, true, isBlockProcess, txPreplay.PreplayResults.IsExternalTransfer)
log.Warn(". . . . . . . . . . . . . . . . . . . . . . . . . ")
SearchTree(txPreplay.PreplayResults.RWRecordTrie, statedb, bc, header, func() bool { return false }, true)
formertxsbs, _ := json.Marshal(statedb.ProcessedTxs)
log.Warn("former tx", "former txs", string(formertxsbs))
depMatch := true
for _, adv := range round.ReadDepSeq {
if adv.AddLoc.Field != cmptypes.Dependence {
continue
}
addr := adv.AddLoc.Address
preTxResId := adv.Value.(*cmptypes.ChangedBy).LastTxResID
if preTxResId == nil {
log.Info("show depcheck match read dep (no tx changed)", "readaddress", addr)
} else {
log.Info("show depcheck match read dep", "readaddress", addr, "lastTxhash", preTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID)
preTxPreplay := reuse.MSRACache.PeekTxPreplay(*preTxResId.Txhash)
if preTxPreplay != nil {
preTxRound, okk := preTxPreplay.PeekRound(preTxResId.RoundID)
if okk {
pretxbs, _ := json.Marshal(preTxRound)
//if jerr != nil {
log.Info("@@@@@@ pre tx info", "pretxRound", string(pretxbs))
//}
depMatch := true
for _, adv := range round.ReadDepSeq {
if adv.AddLoc.Field != cmptypes.Dependence {
continue
}
addr := adv.AddLoc.Address
preTxResId := adv.Value.(*cmptypes.ChangedBy).LastTxResID
if preTxResId == nil {
log.Info("show depcheck match read dep (no tx changed)", "readaddress", addr)
} else {
log.Info("show depcheck match read dep", "readaddress", addr, "lastTxhash", preTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID)
preTxPreplay := reuse.MSRACache.PeekTxPreplay(*preTxResId.Txhash)
if preTxPreplay != nil {
preTxPreplay.RLockRound()
if preTxRound, okk := preTxPreplay.PeekRound(preTxResId.RoundID); okk {
pretxbs, _ := json.Marshal(preTxRound)
//if jerr != nil {
log.Info("@@@@@@ pre tx info", "pretxRound", string(pretxbs))
//}
} else {
log.Warn("no this tx round")
}
preTxPreplay.RUnlockRound()
} else {
log.Warn("no this tx round")
log.Warn("no this tx preplay")
}
}
newTxResId := statedb.GetTxDepByAccount(addr).LastTxResID
if newTxResId == nil {
log.Info("show read dep (no tx changed)", "readaddress", addr)
} else {
log.Info("show read dep", "readaddress", addr, "lastTxhash", newTxResId.Txhash.Hex(), "preRoundID", newTxResId.RoundID)
}
if preTxResId == nil {
if newTxResId == nil {
continue
} else {
depMatch = false
log.Warn("preTxResId is nil and newTxResId is not", "curHash", newTxResId.Hash(),
"curTxhash", newTxResId.Txhash.Hex(), "curRoundID", newTxResId.RoundID)
continue
}
} else {
log.Warn("no this tx preplay")
}
if newTxResId == nil {
depMatch = false
}
newTxResId := statedb.GetTxDepByAccount(addr).LastTxResID
if newTxResId == nil {
log.Info("show read dep (no tx changed)", "readaddress", addr)
} else {
log.Info("show read dep", "readaddress", addr, "lastTxhash", newTxResId.Txhash.Hex(), "preRoundID", newTxResId.RoundID)
}
if preTxResId == nil {
if newTxResId == nil {
continue
log.Warn("newTxResId is nil and preTxResId is not", "preHash", preTxResId.Hash(),
"preTxhash", preTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID)
continue
}
}
if *preTxResId.Hash() == *newTxResId.Hash() {
if !(preTxResId.Txhash.Hex() == newTxResId.Txhash.Hex() && preTxResId.RoundID == newTxResId.RoundID) {
depMatch = false
log.Warn("!! TxResID hash conflict: hash same; content diff", "preHash", preTxResId.Hash(), "curHash", newTxResId.Hash(),
"preTxhash", preTxResId.Txhash.Hex(), "curTxhash", newTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID, "curRoundID", newTxResId.RoundID)
}
} else {
depMatch = false
log.Warn("preTxResId is nil and newTxResId is not", "curHash", newTxResId.Hash(),
"curTxhash", newTxResId.Txhash.Hex(), "curRoundID", newTxResId.RoundID)
continue
if preTxResId.Txhash.Hex() == newTxResId.Txhash.Hex() && preTxResId.RoundID == newTxResId.RoundID {
log.Warn("!!!!! TxResID hash : hash diff; content same", "preHash", preTxResId.Hash(), "curHash", newTxResId.Hash(),
"preTxhash", preTxResId.Txhash.Hex(), "curTxhash", newTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID, "curRoundID", newTxResId.RoundID)
} else {
log.Warn("!!>> read dep hash and content diff <<!!", "preTxhash", preTxResId.Txhash.Hex(), "curTxhash", newTxResId.Txhash.Hex(),
"preRoundID", preTxResId.RoundID, "curRoundID", newTxResId.RoundID)
}
}
} else {
if newTxResId == nil {
depMatch = false
}
log.Warn("newTxResId is nil and preTxResId is not", "preHash", preTxResId.Hash(),
"preTxhash", preTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID)
continue
}
}
if *preTxResId.Hash() == *newTxResId.Hash() {
if !(preTxResId.Txhash.Hex() == newTxResId.Txhash.Hex() && preTxResId.RoundID == newTxResId.RoundID) {
depMatch = false
log.Warn("!! TxResID hash conflict: hash same; content diff", "preHash", preTxResId.Hash(), "curHash", newTxResId.Hash(),
"preTxhash", preTxResId.Txhash.Hex(), "curTxhash", newTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID, "curRoundID", newTxResId.RoundID)
}
} else {
depMatch = false
if preTxResId.Txhash.Hex() == newTxResId.Txhash.Hex() && preTxResId.RoundID == newTxResId.RoundID {
log.Warn("!!!!! TxResID hash : hash diff; content same", "preHash", preTxResId.Hash(), "curHash", newTxResId.Hash(),
"preTxhash", preTxResId.Txhash.Hex(), "curTxhash", newTxResId.Txhash.Hex(), "preRoundID", preTxResId.RoundID, "curRoundID", newTxResId.RoundID)
} else {
log.Warn("!!>> read dep hash and content diff <<!!", "preTxhash", preTxResId.Txhash.Hex(), "curTxhash", newTxResId.Txhash.Hex(),
"preRoundID", preTxResId.RoundID, "curRoundID", newTxResId.RoundID)
}
}
log.Warn("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++", "depmatch", depMatch)
//panic("unsupposed match by trie hit")
}
log.Warn("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++", "depmatch", depMatch)
//panic("unsupposed match by trie hit")
} else if isAbort {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.TrieCheck} // abort before hit or miss
return
}
} else if isAbort {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.TrieCheck} // abort before hit or miss
return
} else {
cache.LockCount[3] ++
}
}
if status == nil {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Miss, MissType: cmptypes.NoMatchMiss,
MissNode: missNode, MissValue: missValue}
return
if isBlockProcess && lockCount == 4 {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Unknown, AbortStage: cmptypes.TxPreplayLock}
return
} else {
d0 = time.Since(t0)
status = &cmptypes.ReuseStatus{BaseStatus: cmptypes.Miss, MissType: cmptypes.NoMatchMiss, MissNode: missNode, MissValue: missValue}
return
}
}
if status.BaseStatus != cmptypes.Hit {

Просмотреть файл

@ -1856,14 +1856,19 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
if statedb == nil {
statedb, err = state.New(parent.Root, bc.stateCache)
} else {
if bc.vmConfig.MSRAVMSettings.CalWarmupMiss {
statedb.CalWarmupMiss = true
statedb.AddrWarmupHelpless = make(map[common.Address]struct{})
if pair := statedb.GetPair(); pair != nil {
pair.CalWarmupMiss = true
pair.AddrWarmupHelpless = make(map[common.Address]struct{})
}
if bc.vmConfig.MSRAVMSettings.WarmupMissDetail {
statedb.ProcessedForDb, statedb.ProcessedForObj = bc.Warmuper.GetProcessed(parent.Root)
statedb.CalWarmupMiss = true
statedb.AddrWarmupHelpless = make(map[common.Address]struct{})
statedb.WarmupMissDetail = true
if pair := statedb.GetPair(); pair != nil {
pair.ProcessedForDb, pair.ProcessedForObj = bc.Warmuper.GetProcessed(parent.Root)
pair.CalWarmupMiss = true
pair.AddrWarmupHelpless = make(map[common.Address]struct{})
pair.WarmupMissDetail = true
}
}
}
@ -2034,7 +2039,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
atomic.StoreUint32(&followupInterrupt, 1)
if !bc.vmConfig.MSRAVMSettings.Silent {
bc.MSRACache.InfoPrint(block, bc.vmConfig, bc.MSRACache.Synced(), bc.ReportReuseMiss, statedb)
bc.MSRACache.InfoPrint(block, types.NewEIP155Signer(bc.chainConfig.ChainID), bc.vmConfig, bc.MSRACache.Synced(), bc.ReportReuseMiss, statedb)
}
bc.MSRACache.Continue()
bc.Warmuper.Continue()

Просмотреть файл

@ -21,6 +21,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
// ListenTxsEvent is posted when a batch of transactions are listened.
type ListenTxsEvent struct{ Txs []*types.Transaction }
// NewTxsEvent is posted when a batch of transactions enter the transaction pool.
type NewTxsEvent struct{ Txs []*types.Transaction }

Просмотреть файл

@ -289,7 +289,7 @@ func (s *stateObject) GetCommittedState(db Database, key common.Hash) common.Has
value.SetBytes(content)
if s.db.CalWarmupMiss {
s.db.KeyWarmupMiss++
if !s.db.IsKeyWarmup(s.address, key) {
if s.db.WarmupMissDetail && !s.db.IsKeyWarmup(s.address, key) {
s.db.KeyNoWarmup++
}
}

Просмотреть файл

@ -76,10 +76,10 @@ func newDeltaDB() *deltaDB {
}
type TxPerfAndStatus struct {
Receipt *types.Receipt
Time time.Duration
Receipt *types.Receipt
Time time.Duration
ReuseStatus *cmptypes.ReuseStatus
Delay float64
Delay float64
}
// StateDBs within the ethereum protocol are used to store anything
@ -167,6 +167,8 @@ type StateDB struct {
ProcessedForDb map[common.Address]map[common.Hash]struct{}
ProcessedForObj map[common.Address]map[common.Hash]struct{}
CalWarmupMiss bool
WarmupMissDetail bool
AccountCreate int
AddrWarmupMiss int
AddrNoWarmup int
AddrWarmupHelpless map[common.Address]struct{}
@ -177,6 +179,11 @@ type StateDB struct {
UnknownTxs []*types.Transaction
UnknownTxReceipts []*types.Receipt
TxPerfs []*TxPerfAndStatus
//Time consumption detail in finalize
waitUpdateRoot time.Duration
updateObj time.Duration
hashTrie time.Duration
}
func (self *StateDB) AddTxPerf(receipt *types.Receipt, time time.Duration, status *cmptypes.ReuseStatus, delayInSecond float64) {
@ -943,7 +950,7 @@ func (s *StateDB) getDeletedStateObject(addr common.Address) *stateObject {
}
if s.CalWarmupMiss {
s.AddrWarmupMiss++
if !s.IsAddrWarmup(addr) {
if s.WarmupMissDetail && !s.IsAddrWarmup(addr) {
s.AddrNoWarmup++
}
}
@ -996,6 +1003,7 @@ func (s *StateDB) createObject(addr common.Address) (newobj, prev *stateObject)
} else {
s.setStateObject(newobj)
}
s.AccountCreate++
return newobj, prev
}
@ -1390,6 +1398,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Finalise all the dirty storage states and write them into the tries
s.Finalise(deleteEmptyObjects)
var start time.Time
if s.IsParallelHasher {
toDelete := make([]*stateObject, 0, len(s.stateObjectsPending))
toUpdateRoot := make([]*stateObject, 0, len(s.stateObjectsPending))
@ -1444,13 +1453,18 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
trie.ExecuteInParallelPool(job)
}
}
start = time.Now()
allJobs.Wait()
s.waitUpdateRoot = time.Since(start)
start = time.Now()
// this has to be done in serial
//for i, obj := range toUpdateRoot {
// //s.updateStateObject(obj)
// s.updateStateObjectWithHashedKeyAndEncodedData(obj, encodedData[i], hashedKeys[i])
//}
s.updateStateObjectsInBatch(keyCopies, hexKeys, encodedData, hashedStrings)
s.updateObj = time.Since(start)
} else {
for addr := range s.stateObjectsPending {
obj := s.stateObjects[addr]
@ -1473,11 +1487,23 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
if s.IsParallelHasher {
s.trie.UseParallelHasher(true)
defer s.trie.UseParallelHasher(false)
start = time.Now()
defer func() {
s.hashTrie = time.Since(start)
}()
}
return s.trie.Hash()
}
func (s *StateDB) DumpFinalizeDetail() (waitUpdateRoot, updateObj, hashTrie time.Duration) {
waitUpdateRoot, s.waitUpdateRoot = s.waitUpdateRoot, 0
updateObj, s.updateObj = s.updateObj, 0
hashTrie, s.hashTrie = s.hashTrie, 0
return
}
// Prepare sets the current transaction hash and index and block hash which is
// used when the EVM emits new state logs.
func (s *StateDB) Prepare(thash, bhash common.Hash, ti int) {

Просмотреть файл

@ -448,36 +448,38 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
}
if cfg.MSRAVMSettings.CalWarmupMiss {
var checkDb = statedb
if pair := statedb.GetPair(); pair != nil && reuseStatus.BaseStatus != cmptypes.Hit {
checkDb = pair
}
if checkDb.HaveMiss() {
word := "Should(Hit)"
if reuseStatus.BaseStatus != cmptypes.Hit {
word = fmt.Sprintf("May(%s)", reuseStatus.BaseStatus)
if reuseStatus.BaseStatus == cmptypes.NoPreplay {
txListen := p.bc.MSRACache.GetTxListen(tx.Hash())
if txListen == nil || txListen.ListenTimeNano > blockPre.ListenTimeNano {
word = "Cannot(NoPreplay)"
}
var checkDb = statedb
if pair := statedb.GetPair(); pair != nil && reuseStatus.BaseStatus != cmptypes.Hit {
checkDb = pair
}
if checkDb.HaveMiss() {
word := "Should(Hit)"
if reuseStatus.BaseStatus != cmptypes.Hit {
word = fmt.Sprintf("May(%s)", reuseStatus.BaseStatus)
if reuseStatus.BaseStatus == cmptypes.NoPreplay {
txListen := p.bc.MSRACache.GetTxListen(tx.Hash())
if txListen == nil || txListen.ListenTimeNano > blockPre.ListenTimeNano {
word = "Cannot(NoPreplay)"
}
}
cache.WarmupMissTxnCount[word]++
cache.AddrWarmupMiss[word] += statedb.AddrWarmupMiss
cache.AddrNoWarmup[word] += statedb.AddrNoWarmup
cache.AddrWarmupHelpless[word] += len(statedb.AddrWarmupHelpless)
cache.KeyWarmupMiss[word] += statedb.KeyWarmupMiss
cache.KeyNoWarmup[word] += statedb.KeyNoWarmup
cache.KeyWarmupHelpless[word] += statedb.KeyWarmupHelpless
}
cache.WarmupMissTxnCount[word]++
cache.AccountCreate[word] += statedb.AccountCreate
cache.AddrWarmupMiss[word] += statedb.AddrWarmupMiss
cache.AddrWarmupHelpless[word] += len(statedb.AddrWarmupHelpless)
cache.KeyWarmupMiss[word] += statedb.KeyWarmupMiss
cache.KeyWarmupHelpless[word] += statedb.KeyWarmupHelpless
statedb.ClearMiss()
if pair := statedb.GetPair(); pair != nil {
pair.ClearMiss()
if cfg.MSRAVMSettings.WarmupMissDetail {
cache.AddrNoWarmup[word] += statedb.AddrNoWarmup
cache.KeyNoWarmup[word] += statedb.KeyNoWarmup
}
}
statedb.ClearMiss()
if pair := statedb.GetPair(); pair != nil {
pair.ClearMiss()
}
} else {
if cfg.MSRAVMSettings.TxApplyPerfLogging {
var baseApply time.Duration
@ -520,18 +522,19 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
if cache.ToScreen && cache.Apply[len(cache.Apply)-1] > time.Millisecond*50 {
applyCost := cache.Apply[len(cache.Apply)-1]
cache.LongExecutionCost += applyCost
if applyCost > cache.MaxLongExecutionCost {
cache.MaxLongExecutionCost = applyCost
if len(reuseResult) > 0 {
cache.LongExecutionCost += cache.Apply[len(cache.Apply)-1]
if cache.Apply[len(cache.Apply)-1] > cache.MaxLongExecutionCost {
cache.MaxLongExecutionCost = cache.Apply[len(cache.Apply)-1]
}
if len(reuseResult) > 0 {
if cache.Reuse[len(cache.Reuse)-1] > cache.MaxLongExecutionReuseCost {
cache.MaxLongExecutionReuseCost = cache.Reuse[len(cache.Reuse)-1]
}
}
context := []interface{}{
"number", block.Number(), "hash", block.Hash(), "index", i, "tx", tx.Hash().Hex(),
//"now time", fmt.Sprintf("%.3fs", float64(runtime.GetNanoTime()-runtime.GetRunTimeInitTime())/1e9),
"apply cost", common.PrettyDuration(applyCost),
"apply cost", common.PrettyDuration(cache.Apply[len(cache.Apply)-1]),
"cum cost", common.PrettyDuration(cache.LongExecutionCost),
"max cost", common.PrettyDuration(cache.MaxLongExecutionCost),
}
@ -574,7 +577,8 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), block.Uncles())
cache.Finalize += time.Since(t0)
cache.WaitUpdateRoot, cache.UpdateObj, cache.HashTrie = statedb.DumpFinalizeDetail()
cache.Finalize = time.Since(t0)
cache.ReuseResult = reuseResult
return receipts, allLogs, *usedGas, nil
}

Просмотреть файл

@ -303,6 +303,7 @@ type TxPool struct {
chainconfig *params.ChainConfig
chain blockChain
gasPrice *big.Int
listenFeed event.Feed
txFeed event.Feed
scope event.SubscriptionScope
signer types.Signer
@ -690,6 +691,12 @@ func (pool *TxPool) Stop() {
log.Info("Transaction pool stopped")
}
// SubscribeListenTxsEvent registers a subscription of ListenTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeListenTxsEvent(ch chan<- ListenTxsEvent) event.Subscription {
return pool.scope.Track(pool.listenFeed.Subscribe(ch))
}
// SubscribeNewTxsEvent registers a subscription of NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- NewTxsEvent) event.Subscription {
@ -1124,6 +1131,9 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
go func(txs []*types.Transaction) {
pool.listenFeed.Send(ListenTxsEvent{txs})
}(txs)
return pool.addTxs(txs, false, false)
}

Просмотреть файл

@ -70,7 +70,7 @@ type MSRAVMConfig struct {
TxApplyPerfLogging bool
PerfLogging bool
ParallelizeReuse bool
CalWarmupMiss bool
WarmupMissDetail bool
ReportMissDetail bool
}

Просмотреть файл

@ -212,7 +212,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
var cmpr *cmpreuse.Cmpreuse
var msracache *cache.GlobalCache
msracache = cache.NewGlobalCache(60*6, 60*6000, int(float64(config.TxPool.GlobalSlots) * 1.5), config.MSRAVMSettings.LogRoot)
msracache = cache.NewGlobalCache(60*6, 60*6000, 6144, config.MSRAVMSettings.LogRoot)
msracache.Synced = eth.Synced
if vmConfig.MSRAVMSettings.EnablePreplay || vmConfig.MSRAVMSettings.GroundRecord {
cmpr = cmpreuse.NewCmpreuse()

3
go.mod
Просмотреть файл

@ -25,7 +25,7 @@ require (
github.com/go-stack/stack v1.8.0
github.com/golang/protobuf v1.3.2-0.20190517061210-b285ee9cfc6c
github.com/golang/snappy v0.0.1
github.com/google/go-cmp v0.3.1
github.com/google/go-cmp v0.3.1 // indirect
github.com/gorilla/websocket v1.4.1-0.20190629185528-ae1634f6a989
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277
github.com/hashicorp/golang-lru v0.5.4
@ -44,6 +44,7 @@ require (
github.com/olekukonko/tablewriter v0.0.2-0.20190409134802-7e037d187b0c
github.com/pborman/uuid v0.0.0-20170112150404-1b00554d8222
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/pkg/errors v0.8.1
github.com/prometheus/tsdb v0.6.2-0.20190402121629-4f204dcbc150
github.com/rjeczalik/notify v0.9.1
github.com/robertkrimen/otto v0.0.0-20170205013659-6a77b7cbc37d

3
go.sum
Просмотреть файл

@ -89,8 +89,6 @@ github.com/gorilla/websocket v1.4.1-0.20190629185528-ae1634f6a989 h1:giknQ4mEuDF
github.com/gorilla/websocket v1.4.1-0.20190629185528-ae1634f6a989/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277 h1:E0whKxgp2ojts0FDgUA8dl62bmH0LxKanMoBr6MDTDM=
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/hashicorp/golang-lru v0.0.0-20160813221303-0a025b7e63ad h1:eMxs9EL0PvIGS9TTtxg4R+JxuPGav82J8rA+GFnY7po=
github.com/hashicorp/golang-lru v0.0.0-20160813221303-0a025b7e63ad/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
@ -181,7 +179,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/subchen/go-trylock v1.3.0 h1:E7D8v3cWRFYtnnd6whjIM97BjbGp4JTq2LIfkd7TzYM=
github.com/subchen/go-trylock/v2 v2.0.0 h1:XAZYp/ZvkBFuvSPAeGM0TjbMby/mHoWnnLBAv2FidUw=
github.com/subchen/go-trylock/v2 v2.0.0/go.mod h1:jjSakPS+IvBCtFw5Fao9rQqdiCnF0ZrkzVkauvkZzLY=
github.com/syndtr/goleveldb v1.0.1-0.20190923125748-758128399b1d h1:gZZadD8H+fF+n9CmNhYL1Y0dJB+kLOmKd7FbPJLeGHs=

55
optipreplayer/cache/global.go поставляемый
Просмотреть файл

@ -48,15 +48,15 @@ type GlobalCache struct {
TxMu sync.RWMutex
TxListenCache *lru.Cache
TxEnpoolCache *lru.Cache
TxPackageCache *lru.Cache
TxEnqueueCache *lru.Cache
// Preplay result
PreplayMu sync.RWMutex
PreplayRoundIDMu sync.RWMutex
PreplayCache *lru.Cache // Result Cache
PreplayCacheSize int
PreplayRoundID uint64
PreplayRoundIDMu sync.RWMutex
PreplayTimestamp uint64 // Last time stamp
// Gas used cache
@ -105,7 +105,8 @@ func NewGlobalCache(bSize int, tSize int, pSize int, logRoot string) *GlobalCach
g.BlockPreCache, _ = lru.New(bSize)
g.BlockCache, _ = lru.New(bSize)
g.TxListenCache, _ = lru.New(tSize * 5)
g.TxListenCache, _ = lru.New(tSize * 10)
g.TxEnpoolCache, _ = lru.New(tSize * 5)
g.TxPackageCache, _ = lru.New(tSize)
g.TxEnqueueCache, _ = lru.New(tSize)
@ -151,34 +152,29 @@ func (r *GlobalCache) FillBigIntPool() {
func (r *GlobalCache) GetTrieAndWObjectSizes() (cachedTxCount int, cachedTxWithTraceCount int,
maxTrieNodeCount int64, totalTrieNodeCount int64, totalMixTrieNodeCount int64, totalRWTrieNodeCount int64,
wobjectCount uint64, wobjectStorageSize uint64) {
txHashes := r.PreplayCache.Keys()
txHashes := r.KeysOfTxPreplay()
cachedTxCount = len(txHashes)
cachedTxWithTraceCount = 0
maxTrieNodeCount = 0
totalTrieNodeCount = 0
for _, key := range txHashes {
p, _ := r.PreplayCache.Peek(key)
if p != nil {
tp := p.(*TxPreplay)
wc, sc := tp.PreplayResults.GetWObjectSize()
wobjectCount += wc
wobjectStorageSize += sc
t := tp.PreplayResults.TraceTrie
if t != nil {
if txPreplay := r.PeekTxPreplay(key); txPreplay != nil {
objectSize, storageItemCount := txPreplay.PreplayResults.GetWObjectSize()
wobjectCount += objectSize
wobjectStorageSize += storageItemCount
if txPreplay.PreplayResults.MixTree != nil {
totalMixTrieNodeCount += txPreplay.PreplayResults.MixTree.GetNodeCount()
}
if txPreplay.PreplayResults.TraceTrie != nil {
cachedTxWithTraceCount++
nc := t.GetNodeCount()
totalTrieNodeCount += nc
if nc > maxTrieNodeCount {
maxTrieNodeCount = nc
nodeCount := txPreplay.PreplayResults.TraceTrie.GetNodeCount()
totalTrieNodeCount += nodeCount
if nodeCount > maxTrieNodeCount {
maxTrieNodeCount = nodeCount
}
}
mt := tp.PreplayResults.MixTree
if mt != nil {
totalMixTrieNodeCount += mt.GetNodeCount()
}
rwt := tp.PreplayResults.RWRecordTrie
if rwt != nil {
totalRWTrieNodeCount += rwt.GetNodeCount()
if txPreplay.PreplayResults.RWRecordTrie != nil {
totalRWTrieNodeCount += txPreplay.PreplayResults.RWRecordTrie.GetNodeCount()
}
}
}
@ -187,12 +183,10 @@ func (r *GlobalCache) GetTrieAndWObjectSizes() (cachedTxCount int, cachedTxWithT
}
func (r *GlobalCache) GCWObjects() () {
txHashes := r.PreplayCache.Keys()
txHashes := r.KeysOfTxPreplay()
for _, key := range txHashes {
p, _ := r.PreplayCache.Peek(key)
if p != nil {
tp := p.(*TxPreplay)
tp.PreplayResults.GCWObjects()
if txPreplay := r.PeekTxPreplay(key); txPreplay != nil {
txPreplay.PreplayResults.GCWObjects()
}
}
}
@ -201,12 +195,10 @@ func (r *GlobalCache) GCWObjects() () {
func (r *GlobalCache) ResetGlobalCache(bSize int, tSize int, pSize int) bool {
r.BlockMu.Lock()
r.PreplayMu.Lock()
r.PreplayRoundIDMu.Lock()
r.TxMu.Lock()
defer func() {
r.TxMu.Unlock()
r.PreplayMu.Unlock()
r.PreplayRoundIDMu.Unlock()
r.BlockMu.Unlock()
}()
@ -217,7 +209,8 @@ func (r *GlobalCache) ResetGlobalCache(bSize int, tSize int, pSize int) bool {
}
if tSize != 0 {
r.TxListenCache, _ = lru.New(tSize)
r.TxListenCache, _ = lru.New(tSize * 10)
r.TxEnpoolCache, _ = lru.New(tSize * 5)
r.TxPackageCache, _ = lru.New(tSize)
r.TxEnqueueCache, _ = lru.New(tSize)
}

12
optipreplayer/cache/listen.go поставляемый
Просмотреть файл

@ -171,6 +171,14 @@ func (r *GlobalCache) GetTxListen(hash common.Hash) *TxListen {
return exTx
}
func (r *GlobalCache) GetTxEnpool(hash common.Hash) uint64 {
if result, ok := r.TxEnpoolCache.Peek(hash); ok {
return result.(uint64)
} else {
return 0
}
}
func (r *GlobalCache) GetTxPackage(hash common.Hash) uint64 {
if result, ok := r.TxPackageCache.Peek(hash); ok {
return result.(uint64)
@ -208,6 +216,10 @@ func (r *GlobalCache) CommitTxListen(tx *TxListen) {
r.TxListenCache.ContainsOrAdd(tx.Tx.Hash(), tx)
}
func (r *GlobalCache) CommitTxEnpool(tx common.Hash, txEnpool uint64) {
r.TxEnpoolCache.ContainsOrAdd(tx, txEnpool)
}
func (r *GlobalCache) CommitTxPackage(tx common.Hash, txPackage uint64) {
r.TxPackageCache.ContainsOrAdd(tx, txPackage)
}

442
optipreplayer/cache/log.go поставляемый
Просмотреть файл

@ -19,51 +19,60 @@ const k = 10
// LogBlockInfo define blockInfo log format
type LogBlockInfo struct {
TxnApply int64 `json:"apply"`
BlockFinalize int64 `json:"finalize"`
Reuse int64 `json:"reuse"`
RealApply int64 `json:"realApply"`
TxnFinalize int64 `json:"txFinalize"`
Update int64 `json:"updatePair"`
GetRW int64 `json:"getRW"`
SetDB int64 `json:"setDB"`
WaitRealApplyEnd int64 `json:"waitRealApplyEnd"`
RunTx int64 `json:"runTx"`
WaitReuseEnd int64 `json:"waitReuseEnd"`
NoListen int `json:"L"`
NoPackage int `json:"Pa"`
NoEnqueue int `json:"E"`
NoPreplay int `json:"Pr"`
Hit int `json:"H"`
Miss int `json:"M"`
Unknown int `json:"U"`
TxPreplayLock int `json:"tL"`
AbortedTrace int `json:"aR"`
AbortedMix int `json:"aM"`
AbortedDelta int `json:"aD"`
AbortedTrie int `json:"aT"`
MixHit int `json:"MH"`
AllDepMixHit int `json:"DMH"`
AllDetailMixHit int `json:"TMH"`
PartialMixHit int `json:"PMH"`
AllDeltaMixHit int `json:"AMH"`
PartialDeltaMixHit int `json:"MMH"`
UnhitHeadCount [10]int `json:"UHC"`
TrieHit int `json:"TH"`
DeltaHit int `json:"EH"`
TraceHit int `json:"RH"`
ReuseGas int `json:"reuseGas"`
ProcTime int64 `json:"procTime"`
RunMode string `json:"runMode"`
TxnCount int `json:"txnCount"`
Header *types.Header `json:"header"`
TxnApply int64 `json:"apply"`
BlockFinalize int64 `json:"finalize"`
Reuse int64 `json:"reuse"`
RealApply int64 `json:"realApply"`
TxnFinalize int64 `json:"txFinalize"`
Update int64 `json:"updatePair"`
GetRW int64 `json:"getRW"`
SetDB int64 `json:"setDB"`
WaitRealApplyEnd int64 `json:"waitRealApplyEnd"`
RunTx int64 `json:"runTx"`
WaitReuseEnd int64 `json:"waitReuseEnd"`
NoListen int `json:"L"`
NoListenAndNoEthermine int `json:"L&NoEthermine"`
NoEnpool int `json:"Ep"`
NoPackage int `json:"Pa"`
NoEnqueue int `json:"Eq"`
NoPreplay int `json:"Pr"`
Hit int `json:"H"`
Miss int `json:"M"`
Unknown int `json:"U"`
MixHit int `json:"MH"`
AllDepMixHit int `json:"DMH"`
AllDetailMixHit int `json:"TMH"`
PartialDetailMixHit int `json:"PMH"`
AllDeltaMixHit int `json:"AMH"`
PartialDeltaMixHit int `json:"MMH"`
UnhitHeadCount [10]int `json:"UHC"`
TraceHit int `json:"RH"`
DeltaHit int `json:"EH"`
TrieHit int `json:"TH"`
TraceMiss int `json:"TM"`
NoMatchMiss int `json:"NMM"`
TxPreplayLock int `json:"tL"`
AbortedTrace int `json:"aR"`
AbortedMix int `json:"aM"`
AbortedDelta int `json:"aD"`
AbortedTrie int `json:"aT"`
ReuseGas int `json:"reuseGas"`
ProcTime int64 `json:"procTime"`
RunMode string `json:"runMode"`
TxnCount int `json:"txnCount"`
Header *types.Header `json:"header"`
}
type MissReporter interface {
SetBlock(block *types.Block)
SetNoPreplayTxn(txn *types.Transaction, enqueue uint64)
SetMissTxn(txn *types.Transaction, miss *cmptypes.PreplayResTrieNode, value interface{}, txnType int)
ReportMiss(noListen, noPackage, noEnqueue, noPreplay uint64)
ReportMiss(noListen, noListenAndEthermine, noEnpool, noPackage, noEnqueue, noPreplay uint64)
}
// LogBlockCache define blockCache log format
@ -128,11 +137,12 @@ type LogPreplayItem struct {
type LogBlockGround []*LogRWrecord
var (
ToScreen bool
ethermine = common.HexToAddress("0xEA674fdDe714fd979de3EdF0F56AA9716B898ec8")
ToScreen bool
Process time.Duration
Process time.Duration
Apply []time.Duration
Finalize time.Duration
Reuse []time.Duration
RealApply []time.Duration
TxFinalize []time.Duration
@ -143,6 +153,11 @@ var (
RunTx []time.Duration
WaitReuseEnd []time.Duration
Finalize time.Duration
WaitUpdateRoot time.Duration
UpdateObj time.Duration
HashTrie time.Duration
ReuseGasCount uint64
MaxLongExecutionCost time.Duration
@ -152,6 +167,7 @@ var (
ReuseResult []*cmptypes.ReuseStatus
WarmupMissTxnCount = make(map[string]int)
AccountCreate = make(map[string]int)
AddrWarmupMiss = make(map[string]int)
AddrNoWarmup = make(map[string]int)
AddrWarmupHelpless = make(map[string]int)
@ -160,6 +176,7 @@ var (
KeyWarmupHelpless = make(map[string]int)
CumWarmupMissTxnCount = make(map[string]int)
CumAccountCreate = make(map[string]int)
CumAddrWarmupMiss = make(map[string]int)
CumAddrNoWarmup = make(map[string]int)
CumAddrWarmupHelpless = make(map[string]int)
@ -168,7 +185,6 @@ var (
CumKeyWarmupHelpless = make(map[string]int)
cumApply time.Duration
cumFinalize time.Duration
cumReuse time.Duration
cumRealApply time.Duration
cumTxFinalize time.Duration
@ -179,23 +195,43 @@ var (
cumRunTx time.Duration
cumWaitReuseEnd time.Duration
blkCount uint64
txnCount uint64
listen uint64
Package uint64
enqueue uint64
preplay uint64
hit uint64
unknown uint64
mixHitCount uint64
trieHitCount uint64
deltaHitCount uint64
traceHitCount uint64
TxPreplayLock uint64
AbortedTrace uint64
AbortedMix uint64
AbortedDelta uint64
AbortedTrie uint64
cumFinalize time.Duration
cumWaitUpdateRoot time.Duration
cumUpdateObj time.Duration
cumHashTrie time.Duration
blkCount uint64
txnCount uint64
listen uint64
listenOrEthermine uint64
enpool uint64
Package uint64
enqueue uint64
preplay uint64
hit uint64
unknown uint64
miss uint64
mixHit uint64
allDepMixHit uint64
allDetailMixHit uint64
partialDetailMixHit uint64
allDeltaMixHit uint64
partialDeltaMixHit uint64
traceHit uint64
deltaHit uint64
trieHit uint64
txPreplayLock uint64
abortedTrace uint64
abortedMix uint64
abortedDelta uint64
abortedTrie uint64
traceMiss uint64
noMatchMiss uint64
LockCount [4]uint64
)
func SumDuration(durations []time.Duration) (sum time.Duration) {
@ -214,6 +250,7 @@ func SumCount(counts []int64) (sum int64) {
func ResetLogVar(size int) {
WarmupMissTxnCount = make(map[string]int)
AccountCreate = make(map[string]int)
AddrWarmupMiss = make(map[string]int)
AddrNoWarmup = make(map[string]int)
AddrWarmupHelpless = make(map[string]int)
@ -222,7 +259,6 @@ func ResetLogVar(size int) {
KeyWarmupHelpless = make(map[string]int)
Apply = make([]time.Duration, 0, size)
Finalize = 0
Reuse = make([]time.Duration, 0, size)
RealApply = make([]time.Duration, 0, size)
TxFinalize = make([]time.Duration, 0, size)
@ -233,6 +269,11 @@ func ResetLogVar(size int) {
RunTx = make([]time.Duration, 0, size)
WaitReuseEnd = make([]time.Duration, 0, size)
Finalize = 0
WaitUpdateRoot = 0
UpdateObj = 0
HashTrie = 0
ReuseGasCount = 0
}
@ -268,7 +309,7 @@ func (r *GlobalCache) LogPrint(filePath string, fileName string, v interface{})
}
// InfoPrint block info to block folder
func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool, reporter MissReporter, statedb *state.StateDB) {
func (r *GlobalCache) InfoPrint(block *types.Block, signer types.Signer, cfg vm.Config, synced bool, reporter MissReporter, statedb *state.StateDB) {
var (
sumApply = SumDuration(Apply)
@ -313,9 +354,16 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
txPackage := r.GetTxPackage(tx.Hash())
if txPackage == 0 || txPackage > processTimeNano {
infoResult.NoPackage++
txListen := r.GetTxListen(tx.Hash())
if txListen == nil || txListen.ListenTimeNano > processTimeNano {
infoResult.NoListen++
txEnpool := r.GetTxEnpool(tx.Hash())
if txEnpool == 0 || txEnpool > processTimeNano {
infoResult.NoEnpool++
txListen := r.GetTxListen(tx.Hash())
if txListen == nil || txListen.ListenTimeNano > processTimeNano {
infoResult.NoListen++
if sender, _ := types.Sender(signer, tx); sender != ethermine {
infoResult.NoListenAndNoEthermine++
}
}
}
}
}
@ -330,7 +378,7 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
case cmptypes.AllDetailHit:
infoResult.AllDetailMixHit++
case cmptypes.PartialHit:
infoResult.PartialMixHit++
infoResult.PartialDetailMixHit++
unHitHead := ReuseResult[index].MixHitStatus.DepUnmatchedInHead
if unHitHead < 9 {
infoResult.UnhitHeadCount[unHitHead]++
@ -352,6 +400,12 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
}
case cmptypes.Miss:
infoResult.Miss++
switch ReuseResult[index].MissType {
case cmptypes.TraceMiss:
infoResult.TraceMiss++
case cmptypes.NoMatchMiss:
infoResult.NoMatchMiss++
}
case cmptypes.Unknown:
infoResult.Unknown++
switch ReuseResult[index].AbortStage {
@ -399,49 +453,68 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
return
}
if cfg.MSRAVMSettings.CalWarmupMiss {
for word := range WarmupMissTxnCount {
CumWarmupMissTxnCount[word] += WarmupMissTxnCount[word]
CumAddrWarmupMiss[word] += AddrWarmupMiss[word]
CumAddrNoWarmup[word] += AddrNoWarmup[word]
CumAddrWarmupHelpless[word] += AddrWarmupHelpless[word]
CumKeyWarmupMiss[word] += KeyWarmupMiss[word]
CumKeyNoWarmup[word] += KeyNoWarmup[word]
CumKeyWarmupHelpless[word] += KeyWarmupHelpless[word]
}
for word := range WarmupMissTxnCount {
CumWarmupMissTxnCount[word] += WarmupMissTxnCount[word]
CumAccountCreate[word] += AccountCreate[word]
CumAddrWarmupMiss[word] += AddrWarmupMiss[word]
CumAddrNoWarmup[word] += AddrNoWarmup[word]
CumAddrWarmupHelpless[word] += AddrWarmupHelpless[word]
CumKeyWarmupMiss[word] += KeyWarmupMiss[word]
CumKeyNoWarmup[word] += KeyNoWarmup[word]
CumKeyWarmupHelpless[word] += KeyWarmupHelpless[word]
}
var keySort []string
for word := range CumWarmupMissTxnCount {
keySort = append(keySort, word)
}
sort.Strings(keySort)
var keySort []string
for word := range CumWarmupMissTxnCount {
keySort = append(keySort, word)
}
sort.Strings(keySort)
for _, word := range keySort {
if CumAddrWarmupMiss[word]+CumAddrWarmupHelpless[word]+CumKeyWarmupMiss[word]+CumKeyWarmupHelpless[word] > 0 {
context := []interface{}{"type", word, "cum txn miss", CumWarmupMissTxnCount[word], "txn miss", WarmupMissTxnCount[word]}
if CumAddrWarmupMiss[word] > 0 {
for _, word := range keySort {
if CumAddrWarmupMiss[word]+CumAddrWarmupHelpless[word]+CumKeyWarmupMiss[word]+CumKeyWarmupHelpless[word] > 0 {
context := []interface{}{"type", word, "cum txn miss", CumWarmupMissTxnCount[word], "txn miss", WarmupMissTxnCount[word],
"cum addr create", CumAccountCreate[word], "addr create", AccountCreate[word]}
if CumAddrWarmupMiss[word] > 0 {
if cfg.MSRAVMSettings.WarmupMissDetail {
context = append(context, "cum addr miss-helpless",
fmt.Sprintf("%d(%d)-%d", CumAddrWarmupMiss[word], CumAddrNoWarmup[word], CumAddrWarmupHelpless[word]))
if AddrWarmupMiss[word] > 0 || AddrWarmupHelpless[word] > 0 {
} else {
context = append(context, "cum addr miss-helpless",
fmt.Sprintf("%d-%d", CumAddrWarmupMiss[word], CumAddrWarmupHelpless[word]))
}
if AddrWarmupMiss[word] > 0 || AddrWarmupHelpless[word] > 0 {
if cfg.MSRAVMSettings.WarmupMissDetail {
context = append(context, "addr miss-helpless",
fmt.Sprintf("%d(%d)-%d", AddrWarmupMiss[word], AddrNoWarmup[word], AddrWarmupHelpless[word]))
} else {
context = append(context, "addr miss-helpless",
fmt.Sprintf("%d-%d", AddrWarmupMiss[word], AddrWarmupHelpless[word]))
}
}
if CumKeyWarmupMiss[word] > 0 {
context = append(context, "cum key miss",
fmt.Sprintf("%d(%d)-%d", CumKeyWarmupMiss[word], CumKeyNoWarmup[word], CumKeyWarmupHelpless[word]))
if KeyWarmupMiss[word] > 0 || KeyWarmupHelpless[word] > 0 {
context = append(context, "key miss",
fmt.Sprintf("%d(%d)-%d", KeyWarmupMiss[word], KeyNoWarmup[word], KeyWarmupHelpless[word]))
}
}
log.Info("Warmup miss statistics", context...)
}
if CumKeyWarmupMiss[word] > 0 {
if cfg.MSRAVMSettings.WarmupMissDetail {
context = append(context, "cum key miss-helpless",
fmt.Sprintf("%d(%d)-%d", CumKeyWarmupMiss[word], CumKeyNoWarmup[word], CumKeyWarmupHelpless[word]))
} else {
context = append(context, "cum key miss-helpless",
fmt.Sprintf("%d-%d", CumKeyWarmupMiss[word], CumKeyWarmupHelpless[word]))
}
if KeyWarmupMiss[word] > 0 || KeyWarmupHelpless[word] > 0 {
if cfg.MSRAVMSettings.WarmupMissDetail {
context = append(context, "key miss-helpless",
fmt.Sprintf("%d(%d)-%d", KeyWarmupMiss[word], KeyNoWarmup[word], KeyWarmupHelpless[word]))
} else {
context = append(context, "key miss-helpless",
fmt.Sprintf("%d-%d", KeyWarmupMiss[word], KeyWarmupHelpless[word]))
}
}
}
log.Info("Warmup miss statistics", context...)
}
}
cumApply += sumApply
cumFinalize += Finalize
cumReuse += sumReuse
cumRealApply += sumRealApply
cumTxFinalize += sumTxFinalize
@ -452,7 +525,12 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
cumRunTx += sumRunTx
cumWaitReuseEnd += sumWaitReuseEnd
context := []interface{}{"apply", common.PrettyDuration(sumApply), "finalize", common.PrettyDuration(Finalize)}
cumFinalize += Finalize
cumWaitUpdateRoot += WaitUpdateRoot
cumUpdateObj += UpdateObj
cumHashTrie += HashTrie
context := []interface{}{"apply", common.PrettyDuration(sumApply)}
if sumApply != 0 {
context = append(context,
"reuse/apply", fmt.Sprintf("%.2f", float64(sumReuse)/float64(sumApply)),
@ -478,9 +556,17 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
)
}
}
if Finalize != 0 {
context = append(context,
"finalize", common.PrettyDuration(Finalize),
"waitUpdateRoot/finalize", fmt.Sprintf("%.2f", float64(WaitUpdateRoot)/float64(Finalize)),
"updateObj/finalize", fmt.Sprintf("%.2f", float64(UpdateObj)/float64(Finalize)),
"hashTrie/finalize", fmt.Sprintf("%.2f", float64(HashTrie)/float64(Finalize)),
)
}
log.Info("Time consumption detail", context...)
context = []interface{}{"apply", common.PrettyDuration(cumApply), "finalize", common.PrettyDuration(cumFinalize)}
context = []interface{}{"apply", common.PrettyDuration(cumApply)}
if cumApply != 0 {
context = append(context,
"reuse/apply", fmt.Sprintf("%.2f", float64(cumReuse)/float64(cumApply)),
@ -506,17 +592,32 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
)
}
}
if cumFinalize != 0 {
context = append(context,
"finalize", common.PrettyDuration(cumFinalize),
"waitUpdateRoot/finalize", fmt.Sprintf("%.2f", float64(cumWaitUpdateRoot)/float64(cumFinalize)),
"updateObj/finalize", fmt.Sprintf("%.2f", float64(cumUpdateObj)/float64(cumFinalize)),
"hashTrie/finalize", fmt.Sprintf("%.2f", float64(cumHashTrie)/float64(cumFinalize)),
)
}
log.Info("Cumulative time consumption detail", context...)
if cfg.MSRAVMSettings.EnablePreplay && cfg.MSRAVMSettings.CmpReuse {
context = []interface{}{"Total", fmt.Sprintf("%03d", infoResult.TxnCount)}
listenCnt := infoResult.TxnCount - infoResult.NoListen
enpoolCnt := infoResult.TxnCount - infoResult.NoEnpool
packageCnt := infoResult.TxnCount - infoResult.NoPackage
enqueueCnt := infoResult.TxnCount - infoResult.NoEnqueue
preplayCnt := infoResult.TxnCount - infoResult.NoPreplay
var listenRate, packageRate, enqueueRate, preplayRate, hitRate, missRate, unknownRate, mixHitRate, trieHitRate, deltaHitRate, traceHitRate float64
var reuseGasRate float64
if infoResult.TxnCount > 0 {
var listenRate, enpoolRate, packageRate, enqueueRate, preplayRate, hitRate, missRate, unknownRate,
mixHitRate, trieHitRate, deltaHitRate, traceHitRate float64
var reuseGasRate float64
listenRate = float64(listenCnt) / float64(infoResult.TxnCount)
enpoolRate = float64(enpoolCnt) / float64(infoResult.TxnCount)
packageRate = float64(packageCnt) / float64(infoResult.TxnCount)
enqueueRate = float64(enqueueCnt) / float64(infoResult.TxnCount)
preplayRate = float64(preplayCnt) / float64(infoResult.TxnCount)
@ -527,79 +628,100 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
trieHitRate = float64(infoResult.TrieHit) / float64(infoResult.TxnCount)
deltaHitRate = float64(infoResult.DeltaHit) / float64(infoResult.TxnCount)
traceHitRate = float64(infoResult.TraceHit) / float64(infoResult.TxnCount)
reuseGasRate = float64(infoResult.ReuseGas) / float64(infoResult.Header.GasUsed)
}
context := []interface{}{
"Total", fmt.Sprintf("%03d", infoResult.TxnCount),
"Listen", fmt.Sprintf("%03d(%.2f)", listenCnt, listenRate),
"Package", fmt.Sprintf("%03d(%.2f)", packageCnt, packageRate),
"Enqueue", fmt.Sprintf("%03d(%.2f)", enqueueCnt, enqueueRate),
"Preplay", fmt.Sprintf("%03d(%.2f)", preplayCnt, preplayRate),
"Hit", fmt.Sprintf("%03d(%.2f)", infoResult.Hit, hitRate),
"MixHit", fmt.Sprintf("%03d(%.2f)-[AllDep:%03d|AllDetail:%03d|Mix:%03d|AllDelta:%03d|DeltaMix:%03d]", infoResult.MixHit, mixHitRate,
infoResult.AllDepMixHit, infoResult.AllDetailMixHit, infoResult.PartialMixHit, infoResult.AllDeltaMixHit, infoResult.PartialDeltaMixHit),
"MixUnhitHead", fmt.Sprint(infoResult.UnhitHeadCount),
}
if infoResult.TrieHit > 0 || infoResult.DeltaHit > 0 || infoResult.TraceHit > 0 {
context = append(context, "RH-DH-TH", fmt.Sprintf("%03d(%.2f)-%03d(%.2f)-%03d(%.2f)",
infoResult.TraceHit, traceHitRate, infoResult.DeltaHit, deltaHitRate, infoResult.TrieHit, trieHitRate))
}
if infoResult.Miss > 0 {
context = append(context, "Miss", fmt.Sprintf("%03d(%.2f)", infoResult.Miss, missRate))
}
if infoResult.Unknown > 0 {
context = append(context, "Unknown", fmt.Sprintf("%03d(%.2f)", infoResult.Unknown, unknownRate))
context = append(context, "TxPreplayLock", infoResult.TxPreplayLock)
if cfg.MSRAVMSettings.ParallelizeReuse {
context = append(context, "AbortStage(R-M-D-T)", fmt.Sprintf("%03d-%03d-%03d-%03d",
infoResult.AbortedTrace, infoResult.AbortedMix, infoResult.AbortedDelta, infoResult.AbortedTrie))
context = append(context, "Total", fmt.Sprintf("%03d", infoResult.TxnCount),
"Listen", fmt.Sprintf("%03d(%.2f)", listenCnt, listenRate),
"Enpool", fmt.Sprintf("%03d(%.2f)", enpoolCnt, enpoolRate),
"Package", fmt.Sprintf("%03d(%.2f)", packageCnt, packageRate),
"Enqueue", fmt.Sprintf("%03d(%.2f)", enqueueCnt, enqueueRate),
"Preplay", fmt.Sprintf("%03d(%.2f)", preplayCnt, preplayRate),
"Hit", fmt.Sprintf("%03d(%.2f)", infoResult.Hit, hitRate),
"MixHit", fmt.Sprintf("%03d(%.2f)-[AllDep:%03d|AllDetail:%03d|PartialDetail:%03d|AllDelta:%03d|PartialDelta:%03d]", infoResult.MixHit, mixHitRate,
infoResult.AllDepMixHit, infoResult.AllDetailMixHit, infoResult.PartialDetailMixHit, infoResult.AllDeltaMixHit, infoResult.PartialDeltaMixHit),
"MixUnhitHead", fmt.Sprint(infoResult.UnhitHeadCount))
if infoResult.TrieHit > 0 || infoResult.DeltaHit > 0 || infoResult.TraceHit > 0 {
context = append(context, "RH-DH-TH", fmt.Sprintf("%03d(%.2f)-%03d(%.2f)-%03d(%.2f)",
infoResult.TraceHit, traceHitRate, infoResult.DeltaHit, deltaHitRate, infoResult.TrieHit, trieHitRate))
}
if infoResult.Miss > 0 {
context = append(context, "Miss", fmt.Sprintf("%03d(%.2f)-[TraceMiss:%03d|NoMatchMiss:%03d]",
infoResult.Miss, missRate, infoResult.TraceMiss, infoResult.NoMatchMiss))
}
if infoResult.Unknown > 0 {
context = append(context, "Unknown", fmt.Sprintf("%03d(%.2f)", infoResult.Unknown, unknownRate),
"TxPreplayLock", infoResult.TxPreplayLock)
if cfg.MSRAVMSettings.ParallelizeReuse {
context = append(context, "AbortStage(R-M-D-T)", fmt.Sprintf("%03d-%03d-%03d-%03d",
infoResult.AbortedTrace, infoResult.AbortedMix, infoResult.AbortedDelta, infoResult.AbortedTrie))
}
}
context = append(context, "ReuseGas", fmt.Sprintf("%d(%.2f)", infoResult.ReuseGas, reuseGasRate))
}
context = append(context, "ReuseGas", fmt.Sprintf("%d(%.2f)", infoResult.ReuseGas, reuseGasRate))
log.Info("Block reuse", context...)
blkCount++
txnCount += uint64(infoResult.TxnCount)
listen += uint64(listenCnt)
listenOrEthermine += uint64(infoResult.TxnCount - infoResult.NoListenAndNoEthermine)
enpool += uint64(enpoolCnt)
Package += uint64(packageCnt)
enqueue += uint64(enqueueCnt)
preplay += uint64(preplayCnt)
hit += uint64(infoResult.Hit)
miss += uint64(infoResult.Miss)
unknown += uint64(infoResult.Unknown)
mixHitCount += uint64(infoResult.MixHit)
trieHitCount += uint64(infoResult.TrieHit)
deltaHitCount += uint64(infoResult.DeltaHit)
traceHitCount += uint64(infoResult.TraceHit)
mixHitRate = float64(mixHitCount) / float64(txnCount)
trieHitRate = float64(trieHitCount) / float64(txnCount)
deltaHitRate = float64(deltaHitCount) / float64(txnCount)
traceHitRate = float64(traceHitCount) / float64(txnCount)
TxPreplayLock += uint64(infoResult.TxPreplayLock)
AbortedTrace += uint64(infoResult.AbortedTrace)
AbortedMix += uint64(infoResult.AbortedMix)
AbortedDelta += uint64(infoResult.AbortedDelta)
AbortedTrie += uint64(infoResult.AbortedTrie)
mixHit += uint64(infoResult.MixHit)
allDepMixHit += uint64(infoResult.AllDepMixHit)
allDetailMixHit += uint64(infoResult.AllDetailMixHit)
partialDetailMixHit += uint64(infoResult.PartialDetailMixHit)
allDeltaMixHit += uint64(infoResult.AllDeltaMixHit)
partialDeltaMixHit += uint64(infoResult.PartialDeltaMixHit)
traceHit += uint64(infoResult.TraceHit)
deltaHit += uint64(infoResult.DeltaHit)
trieHit += uint64(infoResult.TrieHit)
traceMiss += uint64(infoResult.TraceMiss)
noMatchMiss += uint64(infoResult.NoMatchMiss)
txPreplayLock += uint64(infoResult.TxPreplayLock)
abortedTrace += uint64(infoResult.AbortedTrace)
abortedMix += uint64(infoResult.AbortedMix)
abortedDelta += uint64(infoResult.AbortedDelta)
abortedTrie += uint64(infoResult.AbortedTrie)
context = []interface{}{
"block", blkCount, "txn", txnCount,
"listen", fmt.Sprintf("%d(%.3f)", listen, float64(listen)/float64(txnCount)),
"enpool", fmt.Sprintf("%d(%.3f)", enpool, float64(enpool)/float64(txnCount)),
"package", fmt.Sprintf("%d(%.3f)", Package, float64(Package)/float64(txnCount)),
"enqueue", fmt.Sprintf("%d(%.3f)", enqueue, float64(enqueue)/float64(txnCount)),
"preplay", fmt.Sprintf("%d(%.3f)", preplay, float64(preplay)/float64(txnCount)),
"hit", fmt.Sprintf("%d(%.3f)", hit, float64(hit)/float64(txnCount)),
"MH-RH-DH-TH", fmt.Sprintf("%03d(%.2f)-%03d(%.2f)-%03d(%.2f)-%03d(%.2f)",
mixHitCount, mixHitRate, traceHitCount, traceHitRate, deltaHitCount, deltaHitRate, trieHitCount, trieHitRate),
"MixHit", fmt.Sprintf("%d(%.3f)-[AllDep:%d|AllDetail:%d|PartialDetail:%d|AllDelta:%d|PartialDelta:%d]",
mixHit, float64(mixHit)/float64(txnCount), allDepMixHit, allDetailMixHit,
partialDetailMixHit, allDeltaMixHit, partialDeltaMixHit),
"RH-DH-TH", fmt.Sprintf("%d(%.3f)-%d(%.3f)-%d(%.3f)", traceHit, float64(traceHit)/float64(txnCount),
deltaHit, float64(deltaHit)/float64(txnCount), trieHit, float64(trieHit)/float64(txnCount)),
"miss", fmt.Sprintf("%d(%.3f)-[TraceMiss:%03d|NoMatchMiss:%03d]", miss, float64(miss)/float64(txnCount),
traceMiss, noMatchMiss),
}
if unknown > 0 {
context = append(context, "unknown", fmt.Sprintf("%d(%.3f)", unknown, float64(unknown)/float64(txnCount)))
context = append(context, "txPreplayLock", TxPreplayLock)
context = append(context, "unknown", fmt.Sprintf("%d(%.3f)", unknown, float64(unknown)/float64(txnCount)),
"txPreplayLock", txPreplayLock)
if cfg.MSRAVMSettings.ParallelizeReuse {
context = append(context, "AM-AR-AD-AT", fmt.Sprintf("%03d-%03d-%03d-%03d",
AbortedMix, AbortedTrace, AbortedDelta, AbortedTrie))
context = append(context, "abortStage(R-M-D-T)", fmt.Sprintf("%d-%d-%d-%d",
abortedMix, abortedTrace, abortedDelta, abortedTrie))
}
}
log.Info("Cumulative block reuse", context...)
log.Info("Tries lock", "count", fmt.Sprintf("%d-%d-%d-%d", LockCount[0], LockCount[1], LockCount[2], LockCount[3]))
var (
enqueues = make([]uint64, 0)
noPreplayTxns types.Transactions
@ -642,7 +764,7 @@ func (r *GlobalCache) InfoPrint(block *types.Block, cfg vm.Config, synced bool,
reporter.SetMissTxn(txn, nodes[i], values[i], txnType)
}
}
reporter.ReportMiss(txnCount-listen, listen-Package, Package-enqueue, enqueue-preplay)
reporter.ReportMiss(txnCount-listen, listenOrEthermine-listen, listen-enpool, enpool-Package, Package-enqueue, enqueue-preplay)
}
}
@ -695,26 +817,16 @@ func (r *GlobalCache) CachePrint(block *types.Block, reuseResult []*cmptypes.Reu
txPreplay := r.PeekTxPreplay(tx.Hash())
if txPreplay != nil {
txPreplay.Mu.Lock()
var keys []uint64
roundKeys := txPreplay.PreplayResults.Rounds.Keys()
for _, raw := range roundKeys {
keys = append(keys, raw.(uint64))
}
var reducedKeys []uint64
for _, key := range keys {
txPreplay.RLockRound()
txCache.Rounds = txPreplay.KeysOfRound()
for _, key := range txCache.Rounds {
round, _ := txPreplay.PeekRound(key)
if round.Filled != -1 {
continue
}
reducedKeys = append(reducedKeys, key)
txCache.ReducedRounds = append(txCache.ReducedRounds, key)
}
txCache.Rounds = keys
txCache.ReducedRounds = reducedKeys
txPreplay.Mu.Unlock()
txPreplay.RUnlockRound()
} else {
txCache.Rounds = nil
txCache.ReducedRounds = nil
@ -807,15 +919,13 @@ func (r *GlobalCache) PreplayPrint(RoundID uint64, executionOrder []*types.Trans
continue
}
txPreplay.Mu.Lock()
txPreplay.RLockRound()
round, _ := txPreplay.PeekRound(RoundID)
if round == nil {
log.Debug("[PreplayPrint] getRoundID Error", "txHash", tx.Hash(), "roundID", RoundID)
preplayResult.Result = append(preplayResult.Result, nil)
txPreplay.Mu.Unlock()
txPreplay.RUnlockRound()
continue
}
@ -843,7 +953,7 @@ func (r *GlobalCache) PreplayPrint(RoundID uint64, executionOrder []*types.Trans
// currentState = round.CurrentState
// }
txPreplay.Mu.Unlock()
txPreplay.RUnlockRound()
}
// General Log

180
optipreplayer/cache/preplay.go поставляемый
Просмотреть файл

@ -48,31 +48,39 @@ type TxPreplay struct {
// Setting Info
FlagStatus bool // Flag: 0: not in, 1: already in
Mu trylock.TryLocker
}
// NewTxPreplay create new RWRecord
func NewTxPreplay(tx *types.Transaction) *TxPreplay {
preplayResults := &PreplayResults{}
preplayResults.Rounds, _ = lru.New(roundLimit)
//preplayResults.RWrecords, _ = lru.New(roundLimit)
rounds, _ := lru.New(roundLimit)
//RWrecords, _ := lru.New(roundLimit)
preplayResults.RWRecordTrie = cmptypes.NewPreplayResTrie()
preplayResults.ReadDepTree = cmptypes.NewPreplayResTrie()
preplayResults.MixTree = cmptypes.NewPreplayResTrie()
preplayResults.DeltaTree = cmptypes.NewPreplayResTrie()
preplayResults.wobjectHolderMap = make(state.ObjectHolderMap)
preplayResults.objectPointerToObjID = make(map[uintptr]uintptr)
return &TxPreplay{
PreplayResults: preplayResults,
TxHash: tx.Hash(),
GasPrice: tx.GasPrice(),
GasLimit: tx.Gas(),
Tx: tx,
Timestamp: time.Now(),
Mu: trylock.New(),
preplayResults := &PreplayResults{
Rounds: rounds,
//ReadDepTree: cmptypes.NewPreplayResTrie(),
MixTree: cmptypes.NewPreplayResTrie(),
MixTreeMu: trylock.New(),
DeltaTree: cmptypes.NewPreplayResTrie(),
DeltaTreeMu: trylock.New(),
TraceTrieMu: trylock.New(),
RWRecordTrie: cmptypes.NewPreplayResTrie(),
RWRecordTrieMu: trylock.New(),
wobjectHolderMap: make(state.ObjectHolderMap),
objectPointerToObjID: make(map[uintptr]uintptr),
//RWrecords: RWrecords,
}
txPreplay := &TxPreplay{
TxHash: tx.Hash(),
GasPrice: tx.GasPrice(),
GasLimit: tx.Gas(),
Tx: tx,
Timestamp: time.Now(),
}
txPreplay.PreplayResults, preplayResults.txPreplay = preplayResults, txPreplay
return txPreplay
}
func IsExternalTransfer(seqRecord []*cmptypes.AddrLocValue) bool {
@ -112,7 +120,38 @@ func (t *TxPreplay) SetExternalTransferInfo(record *RWRecord) {
}
}
// CreateRound create new round preplay for tx;
func (t *TxPreplay) StoreWObjects(objMap state.ObjectMap, roundId uint64) WObjectWeakRefMap {
result := make(WObjectWeakRefMap)
for addr, obj := range objMap {
cmptypes.MyAssert(obj != nil)
id := t.PreplayResults.GetOrNewObjectID((uintptr)((unsafe.Pointer)(obj)))
ref := NewWObjectWeakReference(t.TxHash, addr, t.Timestamp, id, roundId)
if _, hasHolder := t.PreplayResults.GetHolder(ref); !hasHolder {
holder := state.NewObjectHolder(obj, id)
t.PreplayResults.SetHolder(ref, holder)
result[addr] = ref
}
}
return result
}
func (t *TxPreplay) RLockRound() {
t.PreplayResults.RoundsMu.RLock()
}
func (t *TxPreplay) RUnlockRound() {
t.PreplayResults.RoundsMu.RUnlock()
}
func (t *TxPreplay) LockRound() {
t.PreplayResults.RoundsMu.Lock()
}
func (t *TxPreplay) UnlockRound() {
t.PreplayResults.RoundsMu.Unlock()
}
// CreateOrGetRound get round by roundID, if not exists, create new round for txPreplay.
func (t *TxPreplay) CreateOrGetRound(roundID uint64) (*PreplayResult, bool) {
round, ok := t.PreplayResults.Rounds.Get(roundID)
if ok {
@ -130,31 +169,7 @@ func (t *TxPreplay) CreateOrGetRound(roundID uint64) (*PreplayResult, bool) {
return roundNew, true // true means this roundId is just created
}
func (t *TxPreplay) StoreWObjects(objMap state.ObjectMap, roundId uint64) WObjectWeakRefMap {
result := make(WObjectWeakRefMap)
for addr, obj := range objMap {
cmptypes.MyAssert(obj != nil)
id := t.PreplayResults.GetOrNewObjectID((uintptr)((unsafe.Pointer)(obj)))
ref := NewWObjectWeakReference(t.TxHash, addr, t.Timestamp, id, roundId)
if _, hasHolder := t.PreplayResults.GetHolder(ref); !hasHolder {
holder := state.NewObjectHolder(obj, id)
t.PreplayResults.SetHolder(ref, holder)
result[addr] = ref
}
}
return result
}
// GetRound get round by roundID
func (t *TxPreplay) GetRound(roundID uint64) (*PreplayResult, bool) {
rawRound, ok := t.PreplayResults.Rounds.Get(roundID)
if !ok {
return nil, false
}
return rawRound.(*PreplayResult), true
}
// PeekRound peek round by roundID
// PeekRound peek round by roundID.
func (t *TxPreplay) PeekRound(roundID uint64) (*PreplayResult, bool) {
rawRound, ok := t.PreplayResults.Rounds.Peek(roundID)
if !ok {
@ -163,19 +178,37 @@ func (t *TxPreplay) PeekRound(roundID uint64) (*PreplayResult, bool) {
return rawRound.(*PreplayResult), true
}
// KeysOfRound get a slice of the roundID, from oldest to newest.
func (t *TxPreplay) KeysOfRound() []uint64 {
var keys []uint64
roundKeys := t.PreplayResults.Rounds.Keys()
for _, raw := range roundKeys {
keys = append(keys, raw.(uint64))
}
return keys
}
// PreplayResults record results of several rounds
type PreplayResults struct {
Rounds *lru.Cache `json:"-"`
RWRecordTrie *cmptypes.PreplayResTrie
ReadDepTree *cmptypes.PreplayResTrie
MixTree *cmptypes.PreplayResTrie
DeltaTree *cmptypes.PreplayResTrie
TraceTrie ITracerTrie
Rounds *lru.Cache `json:"-"`
RoundsMu sync.RWMutex
// deprecated
ReadDepTree *cmptypes.PreplayResTrie `json:"-"`
MixTree *cmptypes.PreplayResTrie
MixTreeMu trylock.TryLocker
TraceTrie ITracerTrie
TraceTrieMu trylock.TryLocker
DeltaTree *cmptypes.PreplayResTrie
DeltaTreeMu trylock.TryLocker
RWRecordTrie *cmptypes.PreplayResTrie
RWRecordTrieMu trylock.TryLocker
IsExternalTransfer bool
DeltaWrites map[common.Address]*WStateDelta
wobjectHolderMap state.ObjectHolderMap
holderMapMutex sync.Mutex
wobjectHolderMapMu sync.Mutex
wobjectIDCounter uintptr
objectPointerToObjID map[uintptr]uintptr
@ -183,11 +216,13 @@ type PreplayResults struct {
RWrecords *lru.Cache `json:"-"`
// deprecated
ReadDeps *lru.Cache `json:"-"`
txPreplay *TxPreplay
}
func (rs *PreplayResults) GetOrNewObjectID(objPointerAsInt uintptr) uintptr {
rs.holderMapMutex.Lock()
defer rs.holderMapMutex.Unlock()
rs.wobjectHolderMapMu.Lock()
defer rs.wobjectHolderMapMu.Unlock()
if objID, ok := rs.objectPointerToObjID[objPointerAsInt]; ok {
return objID
}
@ -198,8 +233,8 @@ func (rs *PreplayResults) GetOrNewObjectID(objPointerAsInt uintptr) uintptr {
}
func (rs *PreplayResults) GetAndDeleteHolder(wref *WObjectWeakReference) (*state.ObjectHolder, bool) {
rs.holderMapMutex.Lock()
defer rs.holderMapMutex.Unlock()
rs.wobjectHolderMapMu.Lock()
defer rs.wobjectHolderMapMu.Unlock()
holder, hok := rs.wobjectHolderMap.GetAndDelete(wref.ObjectID)
if hok {
objPointerAsInt := uintptr(unsafe.Pointer(holder.Obj))
@ -212,15 +247,15 @@ func (rs *PreplayResults) GetAndDeleteHolder(wref *WObjectWeakReference) (*state
}
func (rs *PreplayResults) GetHolder(wref *WObjectWeakReference) (*state.ObjectHolder, bool) {
rs.holderMapMutex.Lock()
defer rs.holderMapMutex.Unlock()
rs.wobjectHolderMapMu.Lock()
defer rs.wobjectHolderMapMu.Unlock()
h, ok := rs.wobjectHolderMap[wref.ObjectID]
return h, ok
}
func (rs *PreplayResults) SetHolder(wref *WObjectWeakReference, holder *state.ObjectHolder) {
rs.holderMapMutex.Lock()
defer rs.holderMapMutex.Unlock()
rs.wobjectHolderMapMu.Lock()
defer rs.wobjectHolderMapMu.Unlock()
_, ok := rs.wobjectHolderMap[wref.ObjectID]
cmptypes.MyAssert(!ok)
rs.wobjectHolderMap[wref.ObjectID] = holder
@ -247,8 +282,8 @@ func (rs *PreplayResults) GCWObjects() {
}
}
rs.holderMapMutex.Lock()
defer rs.holderMapMutex.Unlock()
rs.wobjectHolderMapMu.Lock()
defer rs.wobjectHolderMapMu.Unlock()
for objID, _ := range rs.wobjectHolderMap {
if !activeHolderIDs[objID] {
holder := rs.wobjectHolderMap[objID]
@ -263,8 +298,8 @@ func (rs *PreplayResults) GCWObjects() {
}
func (rs *PreplayResults) GetWObjectSize() (objectCount uint64, storageItemCount uint64) {
rs.holderMapMutex.Lock()
defer rs.holderMapMutex.Unlock()
rs.wobjectHolderMapMu.Lock()
defer rs.wobjectHolderMapMu.Unlock()
for _, holder := range rs.wobjectHolderMap {
objectCount++
storageItemCount += uint64(len(holder.Obj.GetOriginStorage()))
@ -787,7 +822,7 @@ func (m *TxPreplayMap) RemoveCheapest(remove int) {
}
// GetTxPreplay returns the result of preplay and updates the "recently used"-ness of the key
func (r *GlobalCache) GetTxPreplay(txHash common.Hash) *TxPreplay {
func (r *GlobalCache) GetTxPreplay(txHash interface{}) *TxPreplay {
result, response := r.PreplayCache.Get(txHash)
@ -803,7 +838,7 @@ func (r *GlobalCache) GetTxPreplay(txHash common.Hash) *TxPreplay {
}
// PeekTxPreplay returns the result of preplay and will not update the "recently used"-ness of the key
func (r *GlobalCache) PeekTxPreplay(txHash common.Hash) *TxPreplay {
func (r *GlobalCache) PeekTxPreplay(txHash interface{}) *TxPreplay {
result, response := r.PreplayCache.Peek(txHash)
@ -818,7 +853,11 @@ func (r *GlobalCache) PeekTxPreplay(txHash common.Hash) *TxPreplay {
return nil
}
func (r *GlobalCache) GetTxPreplayLen() int {
func (r *GlobalCache) KeysOfTxPreplay() []interface{} {
return r.PreplayCache.Keys()
}
func (r *GlobalCache) LenOfTxPreplay() int {
return r.PreplayCache.Len()
}
@ -924,6 +963,9 @@ func (r *GlobalCache) SetMainResult(roundID uint64, receipt *types.Receipt, rwRe
return nil, false
}
txPreplay.LockRound()
defer txPreplay.UnlockRound()
round, _ := txPreplay.CreateOrGetRound(roundID)
round.RWrecord = rwRecord
@ -983,8 +1025,8 @@ func (r *GlobalCache) SetExtraResult(roundID uint64, hash common.Hash, currentSt
return false
}
txPreplay.Mu.Lock()
defer txPreplay.Mu.Unlock()
txPreplay.LockRound()
defer txPreplay.UnlockRound()
round, _ := txPreplay.CreateOrGetRound(roundID)

31
optipreplayer/cache/rpc.go поставляемый
Просмотреть файл

@ -34,11 +34,7 @@ type TxDistributionList struct {
// GetDistributionList RPC for get distribution list
func (r *GlobalCache) GetDistributionList(txHash common.Hash) *TxDistributionList {
r.PreplayMu.RLock()
defer r.PreplayMu.RUnlock()
// defult round ID
// default round ID
roundID := uint64(0)
result := &TxDistributionList{}
@ -47,12 +43,12 @@ func (r *GlobalCache) GetDistributionList(txHash common.Hash) *TxDistributionLis
return result
}
var err error
// rawPending, err := r.eth.TxPool().Pending()
//var err error
//rawPending, err := r.eth.TxPool().Pending()
//if err != nil {
// return result
//}
rawPending := map[common.Address]types.Transactions{}
if err != nil {
return result
}
calList := map[uint64]*TxDistributionItem{}
@ -60,19 +56,21 @@ func (r *GlobalCache) GetDistributionList(txHash common.Hash) *TxDistributionLis
for _, tx := range rawPending[acc] {
txHash := tx.Hash()
tx := r.PeekTxPreplay(txHash)
if tx == nil {
txPreplay := r.PeekTxPreplay(txHash)
if txPreplay == nil {
continue
}
round, _ := tx.PeekRound(roundID)
txPreplay.RLockRound()
round, _ := txPreplay.PeekRound(roundID)
// Dont consider will not in & already in
if round.ExtraResult.Status != "will in" {
txPreplay.RUnlockRound()
continue
}
if tx.GasPrice.Cmp(queryTx.GasPrice) >= 0 {
txPrice := tx.GasPrice.Uint64()
if txPreplay.GasPrice.Cmp(queryTx.GasPrice) >= 0 {
txPrice := txPreplay.GasPrice.Uint64()
if dItem, ok := calList[txPrice]; ok {
dItem.GasCount = dItem.GasCount + 1
dItem.GasUsed = dItem.GasUsed + round.Receipt.GasUsed
@ -80,10 +78,11 @@ func (r *GlobalCache) GetDistributionList(txHash common.Hash) *TxDistributionLis
calList[txPrice] = &TxDistributionItem{
GasCount: 1,
GasUsed: round.Receipt.GasUsed,
GasPrice: tx.GasPrice,
GasPrice: txPreplay.GasPrice,
}
}
}
txPreplay.RUnlockRound()
}
}

Просмотреть файл

@ -31,7 +31,8 @@ func NewListener(eth Backend) *Listener {
}
go listener.cacheEvictionLoop()
go listener.commitLoop()
go listener.listenCommitLoop()
go listener.enpoolCommitLoop()
return listener
}
@ -46,7 +47,7 @@ func (l *Listener) cacheEvictionLoop() {
currentBlock := chainHeadEvent.Block
l.blockMap[currentBlock.NumberU64()] = append(l.blockMap[currentBlock.NumberU64()], currentBlock)
oldSize := l.globalCache.GetTxPreplayLen()
oldSize := l.globalCache.LenOfTxPreplay()
startNodeCount, startWObjectSize := l.globalCache.GetTotalNodeCountAndWObjectSize()
inc := oldSize - newSize
@ -57,7 +58,7 @@ func (l *Listener) cacheEvictionLoop() {
l.removeBefore(currentBlock.NumberU64() - 2)
}
l.globalCache.GCWObjects()
saveSize := l.globalCache.GetTxPreplayLen()
saveSize := l.globalCache.LenOfTxPreplay()
nodeCountAfterBigRemove, _ := l.globalCache.GetTotalNodeCountAndWObjectSize()
@ -71,7 +72,7 @@ func (l *Listener) cacheEvictionLoop() {
}
}
newSize = l.globalCache.GetTxPreplayLen()
newSize = l.globalCache.LenOfTxPreplay()
oldRemoved = saveSize - newSize
removed = oldSize - newSize
@ -81,7 +82,7 @@ func (l *Listener) cacheEvictionLoop() {
"removed", fmt.Sprintf("%d(%d)", removed, oldRemoved),
"newSize", newSize, "newNodeCount", endNodeCount, "newWObjectSize", endWObjectSize,
"oldSize", oldSize, "inc", inc, "startNodeCount", startNodeCount, "startWObjectSize", startWObjectSize,
)
)
}
}
@ -98,32 +99,46 @@ func (l *Listener) removeBefore(remove uint64) {
}
}
func (l *Listener) commitLoop() {
func (l *Listener) listenCommitLoop() {
listenTxsCh := make(chan core.ListenTxsEvent, chanSize)
listenTxsSub := l.txPool.SubscribeListenTxsEvent(listenTxsCh)
defer listenTxsSub.Unsubscribe()
for txsEvent := range listenTxsCh {
if len(txsEvent.Txs) == 0 {
continue
}
nowTime := time.Now()
for _, tx := range txsEvent.Txs {
l.globalCache.CommitTxListen(&cache.TxListen{
Tx: tx,
ListenTime: uint64(nowTime.Unix()),
ListenTimeNano: uint64(nowTime.UnixNano()),
})
}
}
}
func (l *Listener) enpoolCommitLoop() {
newTxsCh := make(chan core.NewTxsEvent, chanSize)
newTxsSub := l.txPool.SubscribeNewTxsEvent(newTxsCh)
defer newTxsSub.Unsubscribe()
for txsEvent := range newTxsCh {
l.commitNewTxs(txsEvent.Txs)
}
}
func (l *Listener) commitNewTxs(txs types.Transactions) bool {
if len(txs) == 0 {
return false
}
if len(txsEvent.Txs) == 0 {
continue
}
nowTime := time.Now()
for _, tx := range txs {
l.globalCache.CommitTxListen(&cache.TxListen{
Tx: tx,
ListenTime: uint64(nowTime.Unix()),
ListenTimeNano: uint64(nowTime.UnixNano()),
ConfirmTime: 0,
ConfirmBlockNum: 0,
})
nowTime := uint64(time.Now().UnixNano())
for _, tx := range txsEvent.Txs {
l.globalCache.CommitTxEnpool(tx.Hash(), nowTime)
}
}
return true
}
func (l *Listener) register() (func(*big.Int), func()) {

Просмотреть файл

@ -413,10 +413,13 @@ func (r *MissReporter) SetMissTxn(txn *types.Transaction, node *cmptypes.Preplay
}
}
func (r *MissReporter) ReportMiss(noListen, noPackage, noEnqueue, noPreplay uint64) {
context := []interface{}{"NoListen", noListen, "NoPackage", noPackage, "NoEnqueue", noEnqueue,
func (r *MissReporter) ReportMiss(noListen, noListenAndEthermine, noEnpool, noPackage, noEnqueue, noPreplay uint64) {
context := []interface{}{
"NoListen", fmt.Sprintf("%d(%d)", noListen, noListenAndEthermine),
"NoEnpool", noEnpool, "NoPackage", noPackage, "NoEnqueue", noEnqueue,
"NoPreplay", fmt.Sprintf("%d(%d-%d)", noPreplay, r.noGroupPreplay, r.noExecPreplay),
"miss", fmt.Sprintf("%d(%d:%d:%d)", r.miss, r.txnType[0], r.txnType[1], r.txnType[2])}
"miss", fmt.Sprintf("%d(%d:%d:%d)", r.miss, r.txnType[0], r.txnType[1], r.txnType[2]),
}
if r.groupMissCount > 0 {
context = append(context, "NoGroup", r.groupMissCount)
}
@ -510,10 +513,13 @@ func (p *Preplayer) getGroundGroup(block *types.Block, txnsIndexMap map[common.H
txnHash := txn.Hash()
txPreplay := p.globalCache.PeekTxPreplay(txnHash)
if txPreplay != nil {
txPreplay.RLockRound()
if round, ok := txPreplay.PeekRound(executor.RoundID); ok {
rwrecords[txnHash] = NewRWRecord(round.RWrecord)
txPreplay.RUnlockRound()
continue
}
txPreplay.RUnlockRound()
}
rwrecords[txnHash] = NewRWRecord(nil)
log.Error("Detect nil rwrecord in miss reporter", "txPreplay != nil", txPreplay != nil,
@ -759,7 +765,7 @@ func pickOneGroup(groupMap map[common.Hash]*TxnGroup) *TxnGroup {
}
func getInterfaceValue(value interface{}) string {
if value == nil{ // miss_value is null
if value == nil { // miss_value is null
return ""
}
val := reflect.ValueOf(value)

Просмотреть файл

@ -303,9 +303,11 @@ func (p *Preplayer) commitNewWork(task *TxnGroup, txnOrder TxnOrder, forecastHea
var rounds = make([]*cache.PreplayResult, 0, len(executor.executionOrder))
for _, tx := range executor.executionOrder {
if txPreplay := p.globalCache.PeekTxPreplay(tx.Hash()); txPreplay != nil {
txPreplay.RLockRound()
if round, _ := txPreplay.PeekRound(executor.RoundID); round != nil {
rounds = append(rounds, round)
}
txPreplay.RUnlockRound()
}
}

Просмотреть файл

@ -20,9 +20,9 @@ const (
// preplayLimitForRemain is the upper limit of preplay count for transactions in remain pool.
preplayLimitForRemain = 2
// remainTxnLimit is the limit of remain txn preplay count.
remainTxnLimit = 20
remainTxnLimit = 40
// remainTxnTotalLimit is the total limit of remain txn preplay count between two blocks.
remainTxnTotalLimit = 200
remainTxnTotalLimit = 800
)
type TaskBuilder struct {
@ -726,9 +726,11 @@ func (b *TaskBuilder) updateDependency(roundID uint64, pool TransactionPool) {
for _, txn := range txns {
txnHash := txn.Hash()
if txPreplay := b.globalCache.PeekTxPreplay(txnHash); txPreplay != nil {
txPreplay.RLockRound()
if round, ok := txPreplay.PeekRound(roundID); ok {
b.insertRWRecord(txnHash, NewRWRecord(round.RWrecord))
}
txPreplay.RUnlockRound()
}
}
}