- When a resourceOffers() call has multiple offers, force the TaskSets
  to consider them in increasing order of locality levels so that they
  get a chance to launch stuff locally across all offers

- Simplify ClusterScheduler.prioritizeContainers

- Add docs on the new configuration options
This commit is contained in:
Matei Zaharia 2013-08-15 17:22:49 -07:00
Родитель 222c897128
Коммит 2a4ed10210
7 изменённых файлов: 68 добавлений и 23 удалений

Просмотреть файл

@ -184,27 +184,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
// Build a list of tasks to assign to each slave
// Build a list of tasks to assign to each worker
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue) {
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
manager.parent.name, manager.name, manager.runningTasks))
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false
for (manager <- sortedTaskSetQueue; offer <- offers) {
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until offers.size) {
val execId = offers(i).executorId
val host = offers(i).host
for (task <- manager.resourceOffer(execId, host, availableCpus(i))) {
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = manager.taskSet.id
taskSetTaskIds(manager.taskSet.id) += tid
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskSetTaskIds(taskSet.taskSet.id) += tid
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
@ -402,8 +404,7 @@ object ClusterScheduler {
// order keyList based on population of value in map
val keyList = _keyList.sortWith(
// TODO(matei): not sure why we're using getOrElse if keyList = map.keys... see if it matters
(left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
(left, right) => map(left).size > map(right).size
)
val retval = new ArrayBuffer[T](keyList.size * 2)

Просмотреть файл

@ -43,7 +43,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
extends TaskSetManager with Logging {
// CPUs to request per task
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@ -325,15 +325,22 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
/**
* Respond to an offer of a single slave from the scheduler by finding a task
*/
override def resourceOffer(execId: String, host: String, availableCpus: Double)
override def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val curTime = System.currentTimeMillis()
val locality = getAllowedLocalityLevel(curTime)
var allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
}
findTask(execId, host, locality) match {
findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
@ -347,7 +354,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
currentLocalityIndex = getLocalityIndex(locality)
currentLocalityIndex = getLocalityIndex(allowedLocality)
lastLaunchTime = curTime
// Serialize and return the task
val startTime = System.currentTimeMillis()

Просмотреть файл

@ -29,7 +29,11 @@ private[spark] trait TaskSetManager extends Schedulable {
def taskSet: TaskSet
def resourceOffer(execId: String, hostPort: String, availableCpus: Double)
def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription]
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)

Просмотреть файл

@ -141,8 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
for (manager <- sortedTaskSetQueue) {
do {
launchTask = false
// TODO(matei): don't pass null here?
manager.resourceOffer(null, null, freeCpuCores) match {
manager.resourceOffer(null, null, freeCpuCores, null) match {
case Some(task) =>
tasks += task
taskIdToTaskSetId(task.taskId) = manager.taskSet.id

Просмотреть файл

@ -98,7 +98,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
override def resourceOffer(execId: String, host: String, availableCpus: Double)
override def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
SparkEnv.set(sched.env)

Просмотреть файл

@ -72,7 +72,11 @@ class DummyTaskSetManager(
override def executorLost(executorId: String, host: String): Unit = {
}
override def resourceOffer(execId: String, host: String, availableCpus: Double)
override def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksFinished + runningTasks < numTasks) {
@ -120,7 +124,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.resourceOffer("execId_1", "hostname_1", 1) match {
taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}

Просмотреть файл

@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
<td>3000</td>
<td>
Number of milliseconds to wait to launch a data-local task before giving up and launching it
in a non-data-local location. You should increase this if your tasks are long and you are seeing
poor data locality, but the default generally works well.
on a less-local node. The same wait will be used to step through multiple locality levels
(process-local, node-local, rack-local and then any). It is also possible to customize the
waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
You should increase this setting if your tasks are long and see poor locality, but the
default usually works well.
</td>
</tr>
<tr>
<td>spark.locality.wait.process</td>
<td>spark.locality.wait</td>
<td>
Customize the locality wait for process locality. This affects tasks that attempt to access
cached data in a particular executor process.
</td>
</tr>
<tr>
<td>spark.locality.wait.node</td>
<td>spark.locality.wait</td>
<td>
Customize the locality wait for node locality. For example, you can set this to 0 to skip
node locality and search immediately for rack locality (if your cluster has rack information).
</td>
</tr>
<tr>
<td>spark.locality.wait.rack</td>
<td>spark.locality.wait</td>
<td>
Customize the locality wait for rack locality.
</td>
</tr>
<tr>