зеркало из https://github.com/microsoft/spark.git
Fix passing of superstep in Bagel to avoid seeing new values of the
superstep value upon recomputation, and set the default storage level in Bagel to MEMORY_AND_DISK
This commit is contained in:
Родитель
eed54a25d8
Коммит
adba773fab
|
@ -7,8 +7,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
import storage.StorageLevel
|
||||
|
||||
object Bagel extends Logging {
|
||||
|
||||
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_ONLY
|
||||
val DEFAULT_STORAGE_LEVEL = StorageLevel.MEMORY_AND_DISK
|
||||
|
||||
/**
|
||||
* Runs a Bagel program.
|
||||
|
@ -63,8 +62,9 @@ object Bagel extends Logging {
|
|||
val combinedMsgs = msgs.combineByKey(
|
||||
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
|
||||
val grouped = combinedMsgs.groupWith(verts)
|
||||
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
|
||||
val (processed, numMsgs, numActiveVerts) =
|
||||
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep), storageLevel)
|
||||
comp[K, V, M, C](sc, grouped, compute(_, _, aggregated, superstep_), storageLevel)
|
||||
|
||||
val timeTaken = System.currentTimeMillis - startTime
|
||||
logInfo("Superstep %d took %d s".format(superstep, timeTaken / 1000))
|
||||
|
|
Загрузка…
Ссылка в новой задаче