This commit is contained in:
Mosharaf Chowdhury 2011-04-27 22:00:24 -07:00
Родитель 0567646180
Коммит e898e108a3
6 изменённых файлов: 41 добавлений и 112 удалений

Просмотреть файл

@ -222,7 +222,8 @@ case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { }
@serializable
case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
val totalBlocks: Int, val totalBytes: Int) {
val totalBlocks: Int,
val totalBytes: Int) {
@transient var hasBlocks = 0
}

Просмотреть файл

@ -469,31 +469,9 @@ object CustomBlockedLocalFileShuffle extends Logging {
nextShuffleId.getAndIncrement()
}
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
return t
}
}
}
// Wrapper over newFixedThreadPool
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = {
var threadPool =
Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory(newDaemonThreadFactory)
return threadPool
}
class ShuffleServer
extends Thread with Logging {
var threadPool =
newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
var threadPool = Shuffle.newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
var serverSocket: ServerSocket = null

Просмотреть файл

@ -396,31 +396,9 @@ object CustomParallelLocalFileShuffle extends Logging {
nextShuffleId.getAndIncrement()
}
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
return t
}
}
}
// Wrapper over newFixedThreadPool
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = {
var threadPool =
Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory(newDaemonThreadFactory)
return threadPool
}
class ShuffleServer
extends Thread with Logging {
var threadPool =
newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
var threadPool = Shuffle.newDaemonFixedThreadPool(Shuffle.MaxTxConnections)
var serverSocket: ServerSocket = null
@ -533,7 +511,7 @@ object CustomParallelLocalFileShuffle extends Logging {
} finally {
logInfo("ShuffleServerThread is closing streams and sockets")
ois.close()
// TODO: Following can cause "java.net.SocketException: Socket closed"
// FIXME: Following can cause "java.net.SocketException: Socket closed"
oos.close()
bos.close()
clientSocket.close()

Просмотреть файл

@ -370,25 +370,4 @@ object HttpParallelLocalFileShuffle extends Logging {
def newShuffleId(): Long = {
nextShuffleId.getAndIncrement()
}
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
return t
}
}
}
// Wrapper over newFixedThreadPool
def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = {
var threadPool =
Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory(newDaemonThreadFactory)
return threadPool
}
}

Просмотреть файл

@ -25,24 +25,15 @@ trait Shuffle[K, V, C] {
*/
object Shuffle
extends Logging {
// Tracker communication constants
val ReducerEntering = 0
val ReducerLeaving = 1
// ShuffleTracker info
private var MasterHostAddress_ = System.getProperty(
"spark.shuffle.masterHostAddress", InetAddress.getLocalHost.getHostAddress)
private var MasterTrackerPort_ = System.getProperty(
"spark.shuffle.masterTrackerPort", "22222").toInt
private var BlockSize_ = System.getProperty(
"spark.shuffle.blockSize", "1024").toInt * 1024
"spark.shuffle.blockSize", "4096").toInt * 1024
// Used thoughout the code for small and large waits/timeouts
private var MinKnockInterval_ = System.getProperty(
"spark.shuffle.minKnockInterval", "1000").toInt
private var MaxKnockInterval_ = System.getProperty(
"spark.shuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "2000").toInt
// Maximum number of connections
private var MaxRxConnections_ = System.getProperty(
@ -52,17 +43,10 @@ extends Logging {
// Upper limit on receiving in blocked implementations (whichever comes first)
private var MaxChatTime_ = System.getProperty(
"spark.shuffle.maxChatTime", "250").toInt
"spark.shuffle.maxChatTime", "500").toInt
private var MaxChatBlocks_ = System.getProperty(
"spark.shuffle.maxChatBlocks", "1024").toInt
// A reducer is throttled if it is this much faster
private var ThrottleFraction_ = System.getProperty(
"spark.shuffle.throttleFraction", "2.0").toDouble
def MasterHostAddress = MasterHostAddress_
def MasterTrackerPort = MasterTrackerPort_
def BlockSize = BlockSize_
def MinKnockInterval = MinKnockInterval_
@ -74,8 +58,6 @@ extends Logging {
def MaxChatTime = MaxChatTime_
def MaxChatBlocks = MaxChatBlocks_
def ThrottleFraction = ThrottleFraction_
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
@ -106,25 +88,4 @@ extends Logging {
return threadPool
}
}
@serializable
case class SplitInfo(val hostAddress: String, val listenPort: Int,
val splitId: Int) {
var hasSplits = 0
var hasSplitsBitVector: BitSet = null
// Used by mappers of dim |numOutputSplits|
var totalBlocksPerOutputSplit: Array[Int] = null
// Used by reducers of dim |numInputSplits|
var hasBlocksPerInputSplit: Array[Int] = null
}
object SplitInfo {
// Constants for special values of listenPort
val MappersBusy = -1
// Other constants
val UnusedParam = 0
}
}

Просмотреть файл

@ -0,0 +1,32 @@
package spark.shuffle
import java.util.BitSet
/**
* Used to keep and pass around information of output splits during shuffle
*/
@serializable
case class SplitInfo(val hostAddress: String,
val listenPort: Int,
val splitId: Int) {
var hasSplits = 0
var hasSplitsBitVector: BitSet = null
// Used by mappers of dim |numOutputSplits|
var totalBlocksPerOutputSplit: Array[Int] = null
// Used by reducers of dim |numInputSplits|
var hasBlocksPerInputSplit: Array[Int] = null
}
/**
* Helper Object of SplitInfo for its constants
*/
object SplitInfo {
// Constants for special values of listenPort
val MappersBusy = -1
// Other constants
val UnusedParam = 0
}