зеркало из https://github.com/microsoft/spark.git
Renamed MaxRxPeers to MaxTxPeers to MaxTxSlots and MaxRxSlots
respectively for clarity (most probably they were misunderstood and misused)
This commit is contained in:
Родитель
b67a968b5d
Коммит
a8f47a62b9
|
@ -67,7 +67,7 @@ extends Broadcast[T] with Logging {
|
|||
// val out = new ObjectOutputStream(BroadcastCH.openFileForWriting(uuid))
|
||||
// out.writeObject(value_)
|
||||
// out.close()
|
||||
// TODO: Fix this at some point
|
||||
// FIXME: Fix this at some point
|
||||
hasCopyInHDFS = true
|
||||
|
||||
// Create a variableInfo object and store it in valueInfos
|
||||
|
@ -476,7 +476,7 @@ extends Broadcast[T] with Logging {
|
|||
pcController.start()
|
||||
logInfo("PeerChatterController started...")
|
||||
|
||||
// TODO: Must fix this. This might never break if broadcast fails.
|
||||
// FIXME: Must fix this. This might never break if broadcast fails.
|
||||
// We should be able to break and send false. Also need to kill threads
|
||||
while (hasBlocks.get < totalBlocks) {
|
||||
Thread.sleep(Broadcast.MaxKnockInterval)
|
||||
|
@ -493,12 +493,11 @@ extends Broadcast[T] with Logging {
|
|||
private var blocksInRequestBitVector = new BitSet(totalBlocks)
|
||||
|
||||
override def run: Unit = {
|
||||
var threadPool =
|
||||
Broadcast.newDaemonFixedThreadPool(Broadcast.MaxTxPeers)
|
||||
var threadPool = Broadcast.newDaemonFixedThreadPool(Broadcast.MaxRxSlots)
|
||||
|
||||
while (hasBlocks.get < totalBlocks) {
|
||||
var numThreadsToCreate =
|
||||
math.min(listOfSources.size, Broadcast.MaxTxPeers) -
|
||||
math.min(listOfSources.size, Broadcast.MaxRxSlots) -
|
||||
threadPool.getActiveCount
|
||||
|
||||
while (hasBlocks.get < totalBlocks && numThreadsToCreate > 0) {
|
||||
|
@ -1105,9 +1104,8 @@ extends Broadcast[T] with Logging {
|
|||
|
||||
class ServeMultipleRequests
|
||||
extends Thread with Logging {
|
||||
// Server at most Broadcast.MaxRxPeers peers
|
||||
var threadPool =
|
||||
Broadcast.newDaemonFixedThreadPool(Broadcast.MaxRxPeers)
|
||||
// Server at most Broadcast.MaxTxSlots peers
|
||||
var threadPool = Broadcast.newDaemonFixedThreadPool(Broadcast.MaxTxSlots)
|
||||
|
||||
override def run: Unit = {
|
||||
var serverSocket = new ServerSocket(0)
|
||||
|
@ -1266,19 +1264,6 @@ extends Logging {
|
|||
|
||||
private var trackMV: TrackMultipleValues = null
|
||||
|
||||
private var MaxPeersInGuideResponse_ = 4
|
||||
|
||||
// Maximum number of receiving and sending threads of a peer
|
||||
private var MaxRxPeers_ = 4
|
||||
private var MaxTxPeers_ = 4
|
||||
|
||||
// Peers can char at most this milliseconds or transfer this number of blocks
|
||||
private var MaxChatTime_ = 500
|
||||
private var MaxChatBlocks_ = 1024
|
||||
|
||||
// Fraction of blocks to receive before entering the end game
|
||||
private var EndGameFraction_ = 0.95
|
||||
|
||||
def initialize(isMaster__ : Boolean): Unit = {
|
||||
synchronized {
|
||||
if (!initialized) {
|
||||
|
|
|
@ -88,10 +88,10 @@ extends Logging {
|
|||
private var MaxPeersInGuideResponse_ = System.getProperty(
|
||||
"spark.broadcast.maxPeersInGuideResponse", "4").toInt
|
||||
|
||||
private var MaxRxPeers_ = System.getProperty(
|
||||
"spark.broadcast.maxRxPeers", "4").toInt
|
||||
private var MaxTxPeers_ = System.getProperty(
|
||||
"spark.broadcast.maxTxPeers", "4").toInt
|
||||
private var MaxRxSlots_ = System.getProperty(
|
||||
"spark.broadcast.maxRxSlots", "4").toInt
|
||||
private var MaxTxSlots_ = System.getProperty(
|
||||
"spark.broadcast.maxTxSlots", "4").toInt
|
||||
|
||||
private var MaxChatTime_ = System.getProperty(
|
||||
"spark.broadcast.maxChatTime", "500").toInt
|
||||
|
@ -123,8 +123,8 @@ extends Logging {
|
|||
// BitTorrentBroadcast configs
|
||||
def MaxPeersInGuideResponse = MaxPeersInGuideResponse_
|
||||
|
||||
def MaxRxPeers = MaxRxPeers_
|
||||
def MaxTxPeers = MaxTxPeers_
|
||||
def MaxRxSlots = MaxRxSlots_
|
||||
def MaxTxSlots = MaxTxSlots_
|
||||
|
||||
def MaxChatTime = MaxChatTime_
|
||||
def MaxChatBlocks = MaxChatBlocks_
|
||||
|
|
Загрузка…
Ссылка в новой задаче