зеркало из https://github.com/microsoft/spark.git
Various fixes to standalone mode and web UI:
- Don't report a job as finishing multiple times - Don't show state of workers as LOADING when they're running - Show start and finish times in web UI - Sort web UI tables by ID and time by default
This commit is contained in:
Родитель
e2b8477487
Коммит
bb1bce7924
|
@ -67,8 +67,8 @@ private[spark] case object RequestMasterState
|
|||
// Master to MasterWebUI
|
||||
|
||||
private[spark]
|
||||
case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
|
||||
completedJobs: List[JobInfo])
|
||||
case class MasterState(uri: String, workers: Array[WorkerInfo], activeJobs: Array[JobInfo],
|
||||
completedJobs: Array[JobInfo])
|
||||
|
||||
// WorkerWebUI to Worker
|
||||
private[spark] case object RequestWorkerState
|
||||
|
@ -78,4 +78,4 @@ private[spark] case object RequestWorkerState
|
|||
private[spark]
|
||||
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
|
||||
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
|
||||
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
|
||||
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
package spark.deploy
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
|
||||
/**
|
||||
* Utilities used throughout the web UI.
|
||||
*/
|
||||
private[spark] object WebUI {
|
||||
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
|
||||
|
||||
def formatDate(date: Date): String = DATE_FORMAT.format(date)
|
||||
|
||||
def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
|
||||
|
||||
def formatDuration(milliseconds: Long): String = {
|
||||
val seconds = milliseconds.toDouble / 1000
|
||||
if (seconds < 60) {
|
||||
return "%.0f s".format(seconds)
|
||||
}
|
||||
val minutes = seconds / 60
|
||||
if (minutes < 10) {
|
||||
return "%.1f min".format(minutes)
|
||||
} else if (minutes < 60) {
|
||||
return "%.0f min".format(minutes)
|
||||
}
|
||||
val hours = minutes / 60
|
||||
return "%.1f h".format(hours)
|
||||
}
|
||||
}
|
|
@ -5,11 +5,17 @@ import java.util.Date
|
|||
import akka.actor.ActorRef
|
||||
import scala.collection.mutable
|
||||
|
||||
private[spark]
|
||||
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
|
||||
private[spark] class JobInfo(
|
||||
val startTime: Long,
|
||||
val id: String,
|
||||
val desc: JobDescription,
|
||||
val submitDate: Date,
|
||||
val actor: ActorRef)
|
||||
{
|
||||
var state = JobState.WAITING
|
||||
var executors = new mutable.HashMap[Int, ExecutorInfo]
|
||||
var coresGranted = 0
|
||||
var endTime = -1L
|
||||
|
||||
private var nextExecutorId = 0
|
||||
|
||||
|
@ -41,4 +47,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
|
|||
_retryCount += 1
|
||||
_retryCount
|
||||
}
|
||||
|
||||
def markFinished(endState: JobState.Value) {
|
||||
state = endState
|
||||
endTime = System.currentTimeMillis()
|
||||
}
|
||||
|
||||
def duration: Long = {
|
||||
if (endTime != -1) {
|
||||
endTime - startTime
|
||||
} else {
|
||||
System.currentTimeMillis() - startTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -123,7 +123,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
|
|||
}
|
||||
|
||||
case RequestMasterState => {
|
||||
sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
|
||||
sender ! MasterState(ip + ":" + port, workers.toArray, jobs.toArray, completedJobs.toArray)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -179,8 +179,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
|
|||
}
|
||||
|
||||
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
|
||||
val date = new Date
|
||||
val job = new JobInfo(newJobId(date), desc, date, actor)
|
||||
val now = System.currentTimeMillis()
|
||||
val date = new Date(now)
|
||||
val job = new JobInfo(now, newJobId(date), desc, date, actor)
|
||||
jobs += job
|
||||
idToJob(job.id) = job
|
||||
actorToJob(sender) = job
|
||||
|
@ -189,19 +190,21 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
|
|||
}
|
||||
|
||||
def removeJob(job: JobInfo) {
|
||||
logInfo("Removing job " + job.id)
|
||||
jobs -= job
|
||||
idToJob -= job.id
|
||||
actorToJob -= job.actor
|
||||
addressToWorker -= job.actor.path.address
|
||||
completedJobs += job // Remember it in our history
|
||||
waitingJobs -= job
|
||||
for (exec <- job.executors.values) {
|
||||
exec.worker.removeExecutor(exec)
|
||||
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
|
||||
if (jobs.contains(job)) {
|
||||
logInfo("Removing job " + job.id)
|
||||
jobs -= job
|
||||
idToJob -= job.id
|
||||
actorToJob -= job.actor
|
||||
addressToWorker -= job.actor.path.address
|
||||
completedJobs += job // Remember it in our history
|
||||
waitingJobs -= job
|
||||
for (exec <- job.executors.values) {
|
||||
exec.worker.removeExecutor(exec)
|
||||
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
|
||||
}
|
||||
job.markFinished(JobState.FINISHED) // TODO: Mark it as FAILED if it failed
|
||||
schedule()
|
||||
}
|
||||
job.state = JobState.FINISHED
|
||||
schedule()
|
||||
}
|
||||
|
||||
/** Generate a new job ID given a job's submission date */
|
||||
|
|
|
@ -36,7 +36,7 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
|
|||
|
||||
// A bit ugly an inefficient, but we won't have a number of jobs
|
||||
// so large that it will make a significant difference.
|
||||
(masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match {
|
||||
(masterState.activeJobs ++ masterState.completedJobs).find(_.id == jobId) match {
|
||||
case Some(job) => spark.deploy.master.html.job_details.render(job)
|
||||
case _ => null
|
||||
}
|
||||
|
|
|
@ -123,7 +123,7 @@ private[spark] class Worker(
|
|||
manager.start()
|
||||
coresUsed += cores_
|
||||
memoryUsed += memory_
|
||||
master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
|
||||
master ! ExecutorStateChanged(jobId, execId, ExecutorState.RUNNING, None)
|
||||
|
||||
case ExecutorStateChanged(jobId, execId, state, message) =>
|
||||
master ! ExecutorStateChanged(jobId, execId, state, message)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
@(state: spark.deploy.MasterState)
|
||||
@import spark.deploy.master._
|
||||
@import spark.Utils
|
||||
|
||||
@spark.deploy.common.html.layout(title = "Spark Master on " + state.uri) {
|
||||
|
||||
|
@ -8,9 +9,11 @@
|
|||
<div class="span12">
|
||||
<ul class="unstyled">
|
||||
<li><strong>URL:</strong> spark://@(state.uri)</li>
|
||||
<li><strong>Number of Workers:</strong> @state.workers.size </li>
|
||||
<li><strong>Cores:</strong> @state.workers.map(_.cores).sum Total, @state.workers.map(_.coresUsed).sum Used</li>
|
||||
<li><strong>Memory:</strong> @state.workers.map(_.memory).sum Total, @state.workers.map(_.memoryUsed).sum Used</li>
|
||||
<li><strong>Workers:</strong> @state.workers.size </li>
|
||||
<li><strong>Cores:</strong> @{state.workers.map(_.cores).sum} Total,
|
||||
@{state.workers.map(_.coresUsed).sum} Used</li>
|
||||
<li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
|
||||
@{Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
|
||||
<li><strong>Jobs:</strong> @state.activeJobs.size Running, @state.completedJobs.size Completed </li>
|
||||
</ul>
|
||||
</div>
|
||||
|
@ -21,7 +24,7 @@
|
|||
<div class="span12">
|
||||
<h3> Cluster Summary </h3>
|
||||
<br/>
|
||||
@worker_table(state.workers)
|
||||
@worker_table(state.workers.sortBy(_.id))
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
@ -32,7 +35,7 @@
|
|||
<div class="span12">
|
||||
<h3> Running Jobs </h3>
|
||||
<br/>
|
||||
@job_table(state.activeJobs)
|
||||
@job_table(state.activeJobs.sortBy(_.startTime).reverse)
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
@ -43,7 +46,7 @@
|
|||
<div class="span12">
|
||||
<h3> Completed Jobs </h3>
|
||||
<br/>
|
||||
@job_table(state.completedJobs)
|
||||
@job_table(state.completedJobs.sortBy(_.endTime).reverse)
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
@(job: spark.deploy.master.JobInfo)
|
||||
|
||||
@import spark.Utils
|
||||
@import spark.deploy.WebUI.formatDate
|
||||
@import spark.deploy.WebUI.formatDuration
|
||||
|
||||
<tr>
|
||||
<td>
|
||||
<a href="job?jobId=@(job.id)">@job.id</a>
|
||||
|
@ -13,8 +17,9 @@
|
|||
, @job.coresLeft
|
||||
}
|
||||
</td>
|
||||
<td>@job.desc.memoryPerSlave</td>
|
||||
<td>@job.submitDate</td>
|
||||
<td>@Utils.memoryMegabytesToString(job.desc.memoryPerSlave)</td>
|
||||
<td>@formatDate(job.submitDate)</td>
|
||||
<td>@job.desc.user</td>
|
||||
<td>@job.state.toString()</td>
|
||||
</tr>
|
||||
<td>@formatDuration(job.duration)</td>
|
||||
</tr>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
@(jobs: List[spark.deploy.master.JobInfo])
|
||||
@(jobs: Array[spark.deploy.master.JobInfo])
|
||||
|
||||
<table class="table table-bordered table-striped table-condensed sortable">
|
||||
<thead>
|
||||
|
@ -6,10 +6,11 @@
|
|||
<th>JobID</th>
|
||||
<th>Description</th>
|
||||
<th>Cores</th>
|
||||
<th>Memory per Slave</th>
|
||||
<th>Submit Date</th>
|
||||
<th>Memory per Node</th>
|
||||
<th>Submit Time</th>
|
||||
<th>User</th>
|
||||
<th>State</th>
|
||||
<th>Duration</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
|
@ -17,4 +18,4 @@
|
|||
@job_row(j)
|
||||
}
|
||||
</tbody>
|
||||
</table>
|
||||
</table>
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
@(worker: spark.deploy.master.WorkerInfo)
|
||||
|
||||
@import spark.Utils
|
||||
|
||||
<tr>
|
||||
<td>
|
||||
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
|
||||
</td>
|
||||
<td>@{worker.host}:@{worker.port}</td>
|
||||
<td>@worker.cores (@worker.coresUsed Used)</td>
|
||||
<td>@{spark.Utils.memoryMegabytesToString(worker.memory)}
|
||||
(@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
|
||||
<td>@{Utils.memoryMegabytesToString(worker.memory)}
|
||||
(@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</td>
|
||||
</tr>
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
@(workers: List[spark.deploy.master.WorkerInfo])
|
||||
@(workers: Array[spark.deploy.master.WorkerInfo])
|
||||
|
||||
<table class="table table-bordered table-striped table-condensed sortable">
|
||||
<thead>
|
||||
|
@ -14,4 +14,4 @@
|
|||
@worker_row(w)
|
||||
}
|
||||
</tbody>
|
||||
</table>
|
||||
</table>
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
@(executor: spark.deploy.worker.ExecutorRunner)
|
||||
|
||||
@import spark.Utils
|
||||
|
||||
<tr>
|
||||
<td>@executor.execId</td>
|
||||
<td>@executor.cores</td>
|
||||
<td>@executor.memory</td>
|
||||
<td>@Utils.memoryMegabytesToString(executor.memory)</td>
|
||||
<td>
|
||||
<ul class="unstyled">
|
||||
<li><strong>ID:</strong> @executor.jobId</li>
|
||||
<li><strong>Name:</strong> @executor.jobDesc.name</li>
|
||||
<li><strong>User:</strong> @executor.jobDesc.user</li>
|
||||
<li><strong>Cores:</strong> @executor.jobDesc.cores </li>
|
||||
<li><strong>Memory per Slave:</strong> @executor.jobDesc.memoryPerSlave</li>
|
||||
</ul>
|
||||
</td>
|
||||
<td>
|
||||
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
|
||||
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
|
||||
</td>
|
||||
</tr>
|
||||
</tr>
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
@(worker: spark.deploy.WorkerState)
|
||||
|
||||
@import spark.Utils
|
||||
|
||||
@spark.deploy.common.html.layout(title = "Spark Worker on " + worker.uri) {
|
||||
|
||||
<!-- Worker Details -->
|
||||
|
@ -12,8 +14,8 @@
|
|||
(WebUI at <a href="@worker.masterWebUiUrl">@worker.masterWebUiUrl</a>)
|
||||
</li>
|
||||
<li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li>
|
||||
<li><strong>Memory:</strong> @{spark.Utils.memoryMegabytesToString(worker.memory)}
|
||||
(@{spark.Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
|
||||
<li><strong>Memory:</strong> @{Utils.memoryMegabytesToString(worker.memory)}
|
||||
(@{Utils.memoryMegabytesToString(worker.memoryUsed)} Used)</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
|
|
Загрузка…
Ссылка в новой задаче