Merge pull request #311 from woggling/map-output-npe

Fix NullPointerException when map output unregistered from MapOutputTracker twice
This commit is contained in:
Matei Zaharia 2012-11-27 20:50:48 -08:00
Родитель 60cb3e9380 cf79de425d
Коммит 935c468b71
2 изменённых файлов: 52 добавлений и 1 удалений

Просмотреть файл

@ -111,7 +111,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var array = mapStatuses.get(shuffleId)
if (array != null) {
array.synchronized {
if (array(mapId).address == bmAddress) {
if (array(mapId) != null && array(mapId).address == bmAddress) {
array(mapId) = null
}
}

Просмотреть файл

@ -2,6 +2,10 @@ package spark
import org.scalatest.FunSuite
import akka.actor._
import spark.scheduler.MapStatus
import spark.storage.BlockManagerId
class MapOutputTrackerSuite extends FunSuite {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
@ -22,4 +26,51 @@ class MapOutputTrackerSuite extends FunSuite {
"size " + size + " decompressed to " + size2 + ", which is out of range")
}
}
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTracker(actorSystem, true)
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTracker(actorSystem, true)
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000),
(new BlockManagerId("hostB", 1000), size10000)))
tracker.stop()
}
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTracker(actorSystem, true)
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simulatenous fetch failures
tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
// The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[Exception] { tracker.getServerStatuses(10, 1) }
}
}