From e30313aa2cd7ebb0108147b972770b610cc89092 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 27 Apr 2011 22:32:35 -0700 Subject: [PATCH 1/3] Added DiskSpillingCache DiskSpillingCache is a BoundedMemoryCache that spills entries to disk when it runs out of space. Currently the implementation is very simple. In particular, it's missing the following features: - Error handling for disk I/O, including checking of disk space levels - Bringing an entry back into memory after fetching it from disk In addition, here are some features that aren't critical but should be implemented soon: - Spilling based on a user-set priority in addition to LRU - Caching into a subdirectory of spark.DiskSpillingCache.cacheDir rather than the root directory --- .../main/scala/spark/BoundedMemoryCache.scala | 8 ++- .../main/scala/spark/DiskSpillingCache.scala | 61 +++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/spark/DiskSpillingCache.scala diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala index 19d9bebfe5..10143d3dd2 100644 --- a/core/src/main/scala/spark/BoundedMemoryCache.scala +++ b/core/src/main/scala/spark/BoundedMemoryCache.scala @@ -60,10 +60,14 @@ class BoundedMemoryCache extends Cache with Logging { val iter = map.entrySet.iterator while (maxBytes - currentBytes < space && iter.hasNext) { val mapEntry = iter.next() - logInfo("Dropping key %s of size %d to make space".format( - mapEntry.getKey, mapEntry.getValue.size)) + dropEntry(mapEntry.getKey, mapEntry.getValue) currentBytes -= mapEntry.getValue.size iter.remove() } } + + protected def dropEntry(key: Any, entry: Entry) { + logInfo("Dropping key %s of size %d to make space".format( + key, entry.size)) + } } diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala new file mode 100644 index 0000000000..c40159d66e --- /dev/null +++ b/core/src/main/scala/spark/DiskSpillingCache.scala @@ -0,0 +1,61 @@ +package spark + +import java.io.File +import java.io.{FileOutputStream,FileInputStream} +import java.util.LinkedHashMap +import java.util.UUID + +// TODO: error handling +// TODO: cache into a separate directory using Utils.createTempDir +// TODO: after reading an entry from disk, put it into the cache + +class DiskSpillingCache extends BoundedMemoryCache { + private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true) + + override def get(key: Any): Any = { + synchronized { + val ser = Serializer.newInstance() + super.get(key) match { + case bytes: Any => // found in memory + ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + + case _ => diskMap.get(key) match { + case file: Any => // found on disk + val startTime = System.currentTimeMillis + val bytes = new Array[Byte](file.length.toInt) + new FileInputStream(file).read(bytes) + val timeTaken = System.currentTimeMillis - startTime + logInfo("Reading key %s of size %d bytes from disk took %d ms".format( + key, file.length, timeTaken)) + ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + + case _ => // not found + null + } + } + } + } + + override def put(key: Any, value: Any) { + var ser = Serializer.newInstance() + super.put(key, ser.serialize(value)) + } + + /** + * Spill least recently used entries to disk until at least space + * bytes are free. Assumes that a lock is held on the DiskSpillingCache. + * Assumes that entry.value is a byte array. + */ + override protected def dropEntry(key: Any, entry: Entry) { + logInfo("Spilling key %s of size %d to make space".format( + key, entry.size)) + val cacheDir = System.getProperty( + "spark.DiskSpillingCache.cacheDir", + System.getProperty("java.io.tmpdir")) + val file = new File(cacheDir, "spark-dsc-" + UUID.randomUUID.toString) + val stream = new FileOutputStream(file) + stream.write(entry.value.asInstanceOf[Array[Byte]]) + stream.close() + diskMap.put(key, file) + } +} From 12ff0d2dc30547052e4c364e5cb9a2f4641651da Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 27 Apr 2011 22:59:05 -0700 Subject: [PATCH 2/3] Bring an entry back into memory after fetching it from disk --- core/src/main/scala/spark/DiskSpillingCache.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala index c40159d66e..06359254b9 100644 --- a/core/src/main/scala/spark/DiskSpillingCache.scala +++ b/core/src/main/scala/spark/DiskSpillingCache.scala @@ -27,6 +27,7 @@ class DiskSpillingCache extends BoundedMemoryCache { val timeTaken = System.currentTimeMillis - startTime logInfo("Reading key %s of size %d bytes from disk took %d ms".format( key, file.length, timeTaken)) + super.put(key, bytes) ser.deserialize(bytes.asInstanceOf[Array[Byte]]) case _ => // not found @@ -42,9 +43,8 @@ class DiskSpillingCache extends BoundedMemoryCache { } /** - * Spill least recently used entries to disk until at least space - * bytes are free. Assumes that a lock is held on the DiskSpillingCache. - * Assumes that entry.value is a byte array. + * Spill the given entry to disk. Assumes that a lock is held on the + * DiskSpillingCache. Assumes that entry.value is a byte array. */ override protected def dropEntry(key: Any, entry: Entry) { logInfo("Spilling key %s of size %d to make space".format( From a4c04f3f6f98fb62bb23cde3778d0b13eebb1d99 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 27 Apr 2011 23:23:29 -0700 Subject: [PATCH 3/3] Error handling for disk I/O in DiskSpillingCache Also renamed the property spark.DiskSpillingCache.cacheDir to spark.diskSpillingCache.cacheDir in order to follow conventions. --- .../main/scala/spark/DiskSpillingCache.scala | 45 ++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala index 06359254b9..9e52fee69e 100644 --- a/core/src/main/scala/spark/DiskSpillingCache.scala +++ b/core/src/main/scala/spark/DiskSpillingCache.scala @@ -2,12 +2,12 @@ package spark import java.io.File import java.io.{FileOutputStream,FileInputStream} +import java.io.IOException import java.util.LinkedHashMap import java.util.UUID -// TODO: error handling // TODO: cache into a separate directory using Utils.createTempDir -// TODO: after reading an entry from disk, put it into the cache +// TODO: clean up disk cache afterwards class DiskSpillingCache extends BoundedMemoryCache { private val diskMap = new LinkedHashMap[Any, File](32, 0.75f, true) @@ -21,14 +21,22 @@ class DiskSpillingCache extends BoundedMemoryCache { case _ => diskMap.get(key) match { case file: Any => // found on disk - val startTime = System.currentTimeMillis - val bytes = new Array[Byte](file.length.toInt) - new FileInputStream(file).read(bytes) - val timeTaken = System.currentTimeMillis - startTime - logInfo("Reading key %s of size %d bytes from disk took %d ms".format( - key, file.length, timeTaken)) - super.put(key, bytes) - ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + try { + val startTime = System.currentTimeMillis + val bytes = new Array[Byte](file.length.toInt) + new FileInputStream(file).read(bytes) + val timeTaken = System.currentTimeMillis - startTime + logInfo("Reading key %s of size %d bytes from disk took %d ms".format( + key, file.length, timeTaken)) + super.put(key, bytes) + ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + } catch { + case e: IOException => + logWarning("Failed to read key %s from disk at %s: %s".format( + key, file.getPath(), e.getMessage())) + diskMap.remove(key) // remove dead entry + null + } case _ => // not found null @@ -50,12 +58,19 @@ class DiskSpillingCache extends BoundedMemoryCache { logInfo("Spilling key %s of size %d to make space".format( key, entry.size)) val cacheDir = System.getProperty( - "spark.DiskSpillingCache.cacheDir", + "spark.diskSpillingCache.cacheDir", System.getProperty("java.io.tmpdir")) val file = new File(cacheDir, "spark-dsc-" + UUID.randomUUID.toString) - val stream = new FileOutputStream(file) - stream.write(entry.value.asInstanceOf[Array[Byte]]) - stream.close() - diskMap.put(key, file) + try { + val stream = new FileOutputStream(file) + stream.write(entry.value.asInstanceOf[Array[Byte]]) + stream.close() + diskMap.put(key, file) + } catch { + case e: IOException => + logWarning("Failed to spill key %s to disk at %s: %s".format( + key, file.getPath(), e.getMessage())) + // Do nothing and let the entry be discarded + } } }