зеркало из https://github.com/microsoft/spark.git
Some tweaks to make Kryo cache work better
This commit is contained in:
Родитель
7febdfbe29
Коммит
ff5b13799a
|
@ -2,7 +2,7 @@ package spark
|
|||
|
||||
import java.io.{File, FileOutputStream}
|
||||
import java.net.{URI, URL, URLClassLoader}
|
||||
import java.util.concurrent.{Executors, ExecutorService}
|
||||
import java.util.concurrent._
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
|
@ -32,7 +32,7 @@ class Executor extends mesos.Executor with Logging {
|
|||
Thread.currentThread.setContextClassLoader(classLoader)
|
||||
|
||||
// Start worker thread pool (they will inherit our context ClassLoader)
|
||||
threadPool = Executors.newCachedThreadPool()
|
||||
threadPool = new ThreadPoolExecutor(1, 128, 600, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable])
|
||||
}
|
||||
|
||||
override def launchTask(d: ExecutorDriver, desc: TaskDescription) {
|
||||
|
|
|
@ -111,11 +111,11 @@ class KryoSerialization extends SerializationStrategy with Logging {
|
|||
val kryo = createKryo()
|
||||
|
||||
val threadBuf = new ThreadLocal[ObjectBuffer] {
|
||||
override def initialValue = new ObjectBuffer(kryo, 128*1024*1024)
|
||||
override def initialValue = new ObjectBuffer(kryo, 257*1024*1024)
|
||||
}
|
||||
|
||||
val threadByteBuf = new ThreadLocal[ByteBuffer] {
|
||||
override def initialValue = ByteBuffer.allocate(128*1024*1024)
|
||||
override def initialValue = ByteBuffer.allocate(257*1024*1024)
|
||||
}
|
||||
|
||||
def createKryo(): Kryo = {
|
||||
|
|
Загрузка…
Ссылка в новой задаче