From 5fa868b98bfd57b4feeed127ea68635f4fd909f9 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 27 Nov 2012 12:50:40 -0800 Subject: [PATCH 1/2] Tests for MapOutputTracker. --- .../scala/spark/MapOutputTrackerSuite.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index 4e9717d871..529445e861 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -2,6 +2,10 @@ package spark import org.scalatest.FunSuite +import akka.actor._ +import spark.scheduler.MapStatus +import spark.storage.BlockManagerId + class MapOutputTrackerSuite extends FunSuite { test("compressSize") { assert(MapOutputTracker.compressSize(0L) === 0) @@ -22,4 +26,51 @@ class MapOutputTrackerSuite extends FunSuite { "size " + size + " decompressed to " + size2 + ", which is out of range") } } + + test("master start and stop") { + val actorSystem = ActorSystem("test") + val tracker = new MapOutputTracker(actorSystem, true) + tracker.stop() + } + + test("master register and fetch") { + val actorSystem = ActorSystem("test") + val tracker = new MapOutputTracker(actorSystem, true) + tracker.registerShuffle(10, 2) + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val compressedSize10000 = MapOutputTracker.compressSize(10000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + val size10000 = MapOutputTracker.decompressSize(compressedSize10000) + tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000), + Array(compressedSize1000, compressedSize10000))) + tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000), + Array(compressedSize10000, compressedSize1000))) + val statuses = tracker.getServerStatuses(10, 0) + assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000), + (new BlockManagerId("hostB", 1000), size10000))) + tracker.stop() + } + + test("master register and unregister and fetch") { + val actorSystem = ActorSystem("test") + val tracker = new MapOutputTracker(actorSystem, true) + tracker.registerShuffle(10, 2) + val compressedSize1000 = MapOutputTracker.compressSize(1000L) + val compressedSize10000 = MapOutputTracker.compressSize(10000L) + val size1000 = MapOutputTracker.decompressSize(compressedSize1000) + val size10000 = MapOutputTracker.decompressSize(compressedSize10000) + tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000), + Array(compressedSize1000, compressedSize1000, compressedSize1000))) + tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000), + Array(compressedSize10000, compressedSize1000, compressedSize1000))) + + // As if we had two simulatenous fetch failures + tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000)) + tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000)) + + // The remaining reduce task might try to grab the output dispite the shuffle failure; + // this should cause it to fail, and the scheduler will ignore the failure due to the + // stage already being aborted. + intercept[Exception] { tracker.getServerStatuses(10, 1) } + } } From cf79de425d1f4c4b92c58a2632766a6cbc072735 Mon Sep 17 00:00:00 2001 From: Charles Reiss Date: Tue, 27 Nov 2012 13:55:56 -0800 Subject: [PATCH 2/2] Fix NullPointerException when unregistering a map output twice. --- core/src/main/scala/spark/MapOutputTracker.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 45441aa5e5..9711987ac2 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -110,7 +110,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea var array = mapStatuses.get(shuffleId) if (array != null) { array.synchronized { - if (array(mapId).address == bmAddress) { + if (array(mapId) != null && array(mapId).address == bmAddress) { array(mapId) = null } }