Refactor Yarn container logs fetching codes

This commit is contained in:
Wei Zhang 2019-08-28 17:13:51 +08:00 коммит произвёл Zhang Wei
Родитель 5818ce5eca
Коммит 51b4b0c466
8 изменённых файлов: 509 добавлений и 249 удалений

Просмотреть файл

@ -46,7 +46,7 @@ public class HdiSparkBatch extends LivySparkBatch implements SparkLogFetcher, De
}
@Override
public Observable<Pair<String, Long>> fetch(final String type,
public Observable<String> fetch(final String type,
final long logOffset,
final int size) {
return getDriverLogFetcherDelegate()

Просмотреть файл

@ -3,9 +3,8 @@
package com.microsoft.azure.spark.tools.job;
import com.microsoft.azure.spark.tools.utils.Pair;
import rx.Observable;
public interface SparkLogFetcher {
Observable<Pair<String, Long>> fetch(String type, long logOffset, int size);
Observable<String> fetch(String type, long logOffset, int size);
}

Просмотреть файл

@ -3,15 +3,9 @@
package com.microsoft.azure.spark.tools.job;
import com.microsoft.azure.spark.tools.clusters.YarnCluster;
import com.microsoft.azure.spark.tools.http.HttpObservable;
import com.microsoft.azure.spark.tools.log.Logger;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.App;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttempt;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttemptsResponse;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppResponse;
import com.microsoft.azure.spark.tools.utils.Pair;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.message.BasicNameValuePair;
@ -20,58 +14,88 @@ import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.nodes.Node;
import org.jsoup.select.Elements;
import rx.Observable;
import rx.subjects.PublishSubject;
import com.microsoft.azure.spark.tools.clusters.YarnCluster;
import com.microsoft.azure.spark.tools.http.HttpObservable;
import com.microsoft.azure.spark.tools.http.HttpResponse;
import com.microsoft.azure.spark.tools.log.Logger;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.App;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttempt;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttemptsResponse;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppResponse;
import com.microsoft.azure.spark.tools.utils.Pair;
import com.microsoft.azure.spark.tools.utils.UriUtils;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownServiceException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static rx.exceptions.Exceptions.propagate;
/**
* The class is to support fetching Spark Driver log from Yarn application UI.
*/
public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
private static final Pattern LOG_TYPE_PATTERN = Pattern.compile("Log Type:\\s+(?<type>\\S+)");
private final HttpObservable http;
/**
* A {LogConversionMode} is a type of enum to present Yarn log UI URI combining ways.
* A {LogConversionMode} is an internal class to present Yarn log UI URI combining ways.
*/
private enum LogConversionMode {
UNKNOWN,
WITHOUT_PORT,
WITH_PORT,
ORIGINAL;
private class LogConversionMode {
static final String HOST = "HOST";
static final String PORT = "PORT";
static final String PATH = "PATH"; // Internal URI path, starting with slash `/`
static final String BASE = "BASE"; // Yarn UI Base URI, ending with slash `/`
static final String ORIGINAL = "ORIGINAL"; // Original internal URI
public static LogConversionMode next(final LogConversionMode current) {
List<LogConversionMode> modes = Arrays.asList(LogConversionMode.values());
private final String name;
private final String publicPathTemplate;
int found = modes.indexOf(current);
if (found + 1 >= modes.size()) {
throw new NoSuchElementException();
} else {
return modes.get(found + 1);
}
LogConversionMode(final String name, final String publicPathTemplate) {
this.name = name;
this.publicPathTemplate = publicPathTemplate;
}
URI toPublic(final URI internalLogUrl) {
final Map<String, String> values = ImmutableMap.of(
HOST, internalLogUrl.getHost(),
PORT, String.valueOf(internalLogUrl.getPort()),
PATH, Optional.of(internalLogUrl.getPath()).filter(StringUtils::isNoneBlank).orElse("/"),
BASE, UriUtils.normalizeWithSlashEnding(URI.create(getCluster().getYarnUIBaseUrl())).toString(),
ORIGINAL, internalLogUrl.toString());
final StrSubstitutor sub = new StrSubstitutor(values);
final String publicPath = sub.replace(publicPathTemplate);
return URI.create(publicPath);
}
}
private final Iterator<LogConversionMode> logConversionModes = Arrays.asList(
new LogConversionMode("WITHOUT_PORT", "${BASE}${HOST}${PATH}"),
new LogConversionMode("WITH_PORT", "${BASE}${HOST}/port/${PORT}${PATH}"),
new LogConversionMode("ORIGINAL", "${ORIGINAL}")
).iterator();
private final URI yarnNMConnectUri;
private @Nullable String currentLogUrl;
private LogConversionMode logUriConversionMode = LogConversionMode.UNKNOWN;
private final String applicationId;
private final YarnCluster cluster;
@ -96,14 +120,6 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
this.currentLogUrl = currentLogUrl;
}
private LogConversionMode getLogUriConversionMode() {
return this.logUriConversionMode;
}
private void setLogUriConversionMode(final LogConversionMode mode) {
this.logUriConversionMode = mode;
}
/**
* Get the current Spark job Yarn application attempt log URI Observable.
*/
@ -148,96 +164,49 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
}
private Observable<Boolean> isUriValid(final URI uriProbe) {
return getHttp()
.request(new HttpGet(uriProbe), null, null, null)
.map(response -> response.getStatusLine().getStatusCode() < 300);
}
private Optional<URI> convertToPublicLogUri(final LogConversionMode mode, final URI internalLogUrl) {
String normalizedPath = Optional.of(internalLogUrl.getPath()).filter(StringUtils::isNoneBlank)
.orElse("/");
URI yarnUiBase = URI.create(getCluster().getYarnUIBaseUrl()
+ (getCluster().getYarnUIBaseUrl().endsWith("/") ? "" : "/"));
switch (mode) {
case UNKNOWN:
return Optional.empty();
case WITHOUT_PORT:
return Optional.of(yarnUiBase.resolve(String.format("%s%s", internalLogUrl.getHost(), normalizedPath)));
case WITH_PORT:
return Optional.of(yarnUiBase.resolve(String.format("%s/port/%s%s",
internalLogUrl.getHost(), internalLogUrl.getPort(), normalizedPath)));
case ORIGINAL:
return Optional.of(internalLogUrl);
default:
throw new AssertionError("Unknown LogConversionMode, shouldn't be reached");
}
return getRequest(uriProbe)
.map(any -> true)
.onErrorReturn(err -> false);
}
private Observable<URI> convertToPublicLogUri(final URI internalLogUri) {
// New version, without port info in log URL
return this.convertToPublicLogUri(this.getLogUriConversionMode(), internalLogUri)
.map(Observable::just)
.orElseGet(() -> {
// Probe usable log URI
LogConversionMode probeMode = YarnContainerLogFetcher.this.getLogUriConversionMode();
while (this.logConversionModes.hasNext()) {
// Try next mode
final LogConversionMode probeMode = this.logConversionModes.next();
final URI uriProbe = probeMode.toPublic(internalLogUri);
boolean isNoMoreTry = false;
while (!isNoMoreTry) {
Optional<URI> uri = this.convertToPublicLogUri(probeMode, internalLogUri)
.filter(uriProbe -> isUriValid(uriProbe).toBlocking().firstOrDefault(false));
if (isUriValid(uriProbe).toBlocking().firstOrDefault(false)) {
// Find usable one
log().debug("The Yarn log URL conversion mode is {} with pattern {}",
probeMode.name, probeMode.publicPathTemplate);
if (uri.isPresent()) {
// Find usable one
YarnContainerLogFetcher.this.setLogUriConversionMode(probeMode);
return Observable.just(uri.get());
}
return Observable.just(uriProbe);
}
}
try {
probeMode = LogConversionMode.next(probeMode);
} catch (NoSuchElementException ignore) {
log().warn("Can't find conversion mode of Yarn " + getYarnNMConnectUri());
isNoMoreTry = true;
}
}
// All modes were probed and all failed
return Observable.empty();
});
// All modes were probed and all failed
log().warn("Can't find conversion mode of Yarn " + getYarnNMConnectUri());
return Observable.empty();
}
@Override
public Observable<Pair<String, Long>> fetch(final String type, final long logOffset, final int size) {
public Observable<String> fetch(final String type, final long logOffset, final int size) {
return this.getSparkJobDriverLogUrl()
.map(Object::toString)
.flatMap(logUrl -> {
long offset = logOffset;
final long offset = StringUtils.equalsIgnoreCase(logUrl, this.getCurrentLogUrl())
? logOffset
: 0L;
if (!StringUtils.equalsIgnoreCase(logUrl, this.getCurrentLogUrl())) {
this.setCurrentLogUrl(logUrl);
offset = 0L;
}
this.setCurrentLogUrl(logUrl);
String probedLogUrl = this.getCurrentLogUrl();
if (probedLogUrl == null) {
return Observable.empty();
}
String logGot = this.getInformationFromYarnLogDom(probedLogUrl, type, offset, size);
if (StringUtils.isEmpty(logGot)) {
return getYarnApplicationRequest()
.flatMap(app -> {
if (isLogFetchable(app.getState())) {
return Observable.empty();
} else {
return Observable.just(Pair.of("", -1L));
}
});
} else {
return Observable.just(Pair.of(logGot, offset));
}
});
return this.getContentFromYarnLogDom(logUrl, type, offset, size);
})
.repeatWhen(completed -> getYarnApplicationRequest()
.filter(app -> isLogFetchable(app.getState()))
.flatMap(app -> completed)
.delay(1, TimeUnit.SECONDS))
.first();
}
private boolean isLogFetchable(final String status) {
@ -292,87 +261,77 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
}
}
private String getInformationFromYarnLogDom(final String baseUrl,
final String type,
final long start,
final int size) {
URI url = URI.create(StringUtils.stripEnd(baseUrl, "/") + "/").resolve(
String.format("%s?start=%d", type, start) + (size <= 0 ? "" : String.format("&&end=%d", start + size)));
private Observable<String> getContentFromYarnLogDom(final String baseUrl,
final String type,
final long start,
final int size) {
final URI url = UriUtils.normalizeWithSlashEnding(baseUrl).resolve(type);
List<NameValuePair> params = new ArrayList<>();
final List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("start", Long.toString(start)));
if (size > 0) {
params.add(new BasicNameValuePair("size", Long.toString(size)));
}
String typedLogs = getHttp()
.requestWithHttpResponse(new HttpGet(url), null, params, null)
.doOnNext(response -> {
try {
log().debug("Fetch log from " + url + ", got " + response.getCode()
+ " with " + response.getMessage());
} catch (IOException ignored) {
// The upstream requestWithHttpResponse() has already get message buffered.
}
})
return getRequest(url, params)
.map(response -> {
Document doc = null;
try {
doc = Jsoup.parse(response.getMessage());
return response.getMessage();
} catch (IOException ignored) {
// The upstream requestWithHttpResponse() has already get message buffered.
throw new AssertionError("The upstream has got messages.");
throw propagate(new AssertionError("The upstream has got messages."));
}
Iterator<Element> iterator = Optional.ofNullable(doc.getElementById("navcell"))
.map(Element::nextElementSibling)
.map(Element::children)
.map(ArrayList::iterator)
.orElse(Collections.emptyIterator());
HashMap<String, String> logTypeMap = new HashMap<>();
final AtomicReference<String> logType = new AtomicReference<>();
String logs = "";
while (iterator.hasNext()) {
Element node = iterator.next();
if (StringUtils.equalsIgnoreCase(node.tagName(), "p")) {
// In history server, need to read log type paragraph in page
final Pattern logTypePattern = Pattern.compile("Log Type:\\s+(\\S+)");
node.childNodes().stream()
.findFirst()
.map(Node::toString)
.map(StringUtils::trim)
.map(logTypePattern::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(1))
.ifPresent(logType::set);
} else if (StringUtils.equalsIgnoreCase(node.tagName(), "pre")) {
// In running, no log type paragraph in page
logs = node.childNodes().stream()
.findFirst()
.map(Node::toString)
.orElse("");
if (logType.get() != null) {
// Only get the first <pre>...</pre>
logTypeMap.put(logType.get(), logs);
logType.set(null);
}
}
}
return logTypeMap.getOrDefault(type, logs);
})
.doOnError(err -> log().warn("Can't parse information from YarnUI log page " + url, err))
.toBlocking()
.firstOrDefault("");
.flatMap(html -> {
final String logs = parseLogsFromHtml(html).getOrDefault(type, StringUtils.EMPTY);
return typedLogs;
return StringUtils.isEmpty(logs) ? Observable.empty() : Observable.just(logs);
})
.doOnError(err -> log().warn("Can't parse information from YarnUI log page " + url, err));
}
Map<String, String> parseLogsFromHtml(final String webPage) {
final Document doc = Jsoup.parse(webPage);
final Elements elements = Optional.ofNullable(doc.getElementById("navcell"))
.map(Element::nextElementSibling)
.map(Element::children)
.orElse(null);
if (elements == null) {
return emptyMap();
}
// Subject for log type found
final PublishSubject<String> logTypeWindowOpenings = PublishSubject.create();
return Observable.from(elements)
.doOnNext(node -> {
if (StringUtils.equalsIgnoreCase(node.tagName(), "p")) {
// In history server, need to read log type paragraph in page
node.childNodes().stream()
.findFirst()
.map(this::findLogTypeDomNode)
.ifPresent(logTypeWindowOpenings::onNext);
}
})
.withLatestFrom(logTypeWindowOpenings, Pair::of)
.filter(nodeWithType ->
// The log content starts from `pre`
StringUtils.equalsIgnoreCase(nodeWithType.getFirst().tagName(), "pre")
&& !nodeWithType.getFirst().childNodes().isEmpty())
// Only take the first `<pre>` element
.doOnNext(nodeWithType -> logTypeWindowOpenings.onNext(nodeWithType.getRight() + "-ending"))
.toMap(Pair::getSecond, nodeWithType ->
// Get the content as log
String.valueOf(nodeWithType.getFirst().childNodes().get(0)))
.toBlocking()
.firstOrDefault(emptyMap());
}
private @Nullable String findLogTypeDomNode(Node node) {
final Matcher matcher = LOG_TYPE_PATTERN.matcher(node.toString().trim());
return matcher.matches() ? matcher.group("type") : null;
}
public final String getApplicationId() {
@ -396,7 +355,7 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
return getHttp()
// .withUuidUserAgent()
.get(uri.toString(), null, null, AppResponse.class)
.get(uri.toString(), emptyList(), emptyList(), AppResponse.class)
.map(Pair::getFirst)
.map(AppResponse::getApp);
}
@ -406,8 +365,25 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
return getHttp()
// .withUuidUserAgent()
.get(uri.toString(), null, null, AppAttemptsResponse.class)
.get(uri.toString(), emptyList(), emptyList(), AppAttemptsResponse.class)
.map(Pair::getFirst)
.map(appAttemptsResponse -> appAttemptsResponse.getAppAttempts().appAttempt);
}
private Observable<HttpResponse> getRequest(URI url, List<NameValuePair> params) {
return getHttp()
.requestWithHttpResponse(new HttpGet(url), null, params, emptyList())
.doOnNext(response -> {
try {
log().debug("Get page from " + url + ", got " + response.getCode()
+ " with " + response.getMessage());
} catch (IOException ignored) {
// The upstream requestWithHttpResponse() has already get message buffered.
}
});
}
private Observable<HttpResponse> getRequest(URI url) {
return getRequest(url, emptyList());
}
}

Просмотреть файл

@ -6,15 +6,11 @@ package com.microsoft.azure.spark.tools.processes;
import com.microsoft.azure.spark.tools.job.SparkLogFetcher;
import com.microsoft.azure.spark.tools.utils.LaterInit;
import com.microsoft.azure.spark.tools.utils.Pair;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import static java.lang.Thread.sleep;
public class SparkJobLogInputStream extends InputStream {
private String logType;
private LaterInit<SparkLogFetcher> sparkLogFetcher = new LaterInit<>();
@ -35,24 +31,6 @@ public class SparkJobLogInputStream extends InputStream {
return fetcher;
}
private synchronized @Nullable Pair<String, Long> fetchLog(final long logOffset, final int fetchSize) {
try {
if (!isClosed && sparkLogFetcher.isInitialized()) {
return getAttachedLogFetcher()
.fetch(getLogType(), logOffset, fetchSize)
.toBlocking()
.first();
}
} catch (NoSuchElementException ignored) {
}
return null;
}
public SparkLogFetcher getAttachedLogFetcher() {
return sparkLogFetcher.get();
}
@Override
public int read() throws IOException {
if (isClosed) {
@ -60,55 +38,23 @@ public class SparkJobLogInputStream extends InputStream {
}
if (bufferPos >= buffer.length) {
// throw new IOException("Beyond the buffer end, needs a new log fetch");
int avail;
try {
final String logSlice = sparkLogFetcher.observable()
.flatMap(fetcher -> fetcher.fetch(getLogType(), offset, -1))
.toBlocking()
.first();
do {
avail = available();
if (avail == -1) {
return -1;
}
try {
sleep(1000);
} catch (InterruptedException ignore) {
return -1;
}
} while (avail == 0);
buffer = logSlice.getBytes();
bufferPos = 0;
offset += logSlice.length();
} catch (NoSuchElementException ignored) {
return -1;
}
}
return buffer[bufferPos++];
}
@Override
public int available() throws IOException {
if (isClosed) {
return -1;
}
if (bufferPos >= buffer.length) {
Pair<String, Long> sliceOffsetPair = fetchLog(offset, -1);
if (sliceOffsetPair == null) {
return 0;
}
if (sliceOffsetPair.getValue() == -1L) {
return -1;
}
buffer = sliceOffsetPair.getKey().getBytes();
bufferPos = 0;
offset = sliceOffsetPair.getValue() + sliceOffsetPair.getKey().length();
return buffer.length;
} else {
return buffer.length - bufferPos;
}
}
public String getLogType() {
return logType;
}

Просмотреть файл

@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.utils;
import java.net.URI;
import java.net.URISyntaxException;
public class UriUtils {
public static URI normalizeWithSlashEnding(final URI src) {
try {
return src.getPath().endsWith("/")
? src
: new URI(
src.getScheme(),
src.getAuthority(),
src.getPath() + "/",
src.getQuery(),
src.getFragment());
} catch (URISyntaxException x) {
throw new IllegalArgumentException(x.getMessage(), x);
}
}
public static URI normalizeWithSlashEnding(final String src) {
return normalizeWithSlashEnding(URI.create(src));
}
}

Просмотреть файл

@ -12,11 +12,17 @@ import cucumber.api.java.After;
import cucumber.api.java.Before;
import cucumber.api.java.en.Given;
import cucumber.api.java.en.Then;
import org.apache.commons.io.IOUtils;
import uk.org.lidalia.slf4jtest.TestLogger;
import uk.org.lidalia.slf4jtest.TestLoggerFactory;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@ -28,6 +34,7 @@ public class YarnContainerLogFetcherScenario {
private YarnCluster yarnClusterMock;
private YarnContainerLogFetcher yarnDriverLogFetcherMock;
private TestLogger logger = TestLoggerFactory.getTestLogger(YarnContainerLogFetcher.class);
private Map<String, String> logsByType = Collections.emptyMap();
@Before("@YarnContainerLogFetcherScenario")
public void setUp() throws Throwable {
@ -102,4 +109,28 @@ public class YarnContainerLogFetcherScenario {
public void gettingSparkJobDriverLogURLObservableShouldBeEmpty() throws Throwable {
assertTrue(yarnDriverLogFetcherMock.getSparkJobDriverLogUrl().isEmpty().toBlocking().last());
}
@Given("parse Yarn container log fetched from HTML page {string}")
public void parseYarnContainerLogFetchedFromHTMLPage(String webPageFileName) throws Throwable {
InputStream pageFileInput = getClass().getClassLoader().getResourceAsStream(
getClass().getPackage().getName().replace('.', File.separatorChar)
+ File.separator + webPageFileName);
String html = IOUtils.toString(pageFileInput, UTF_8);
logsByType = yarnDriverLogFetcherMock.parseLogsFromHtml(html);
}
@Then("check the type log {string} should start with {string}")
public void checkTheTypeLogDirectoryInfoShouldStartWithTemplate(String type, String expectStart) {
assertTrue("No such type log: " + type, logsByType.containsKey(type));
assertTrue("The type " + type + " log [" + logsByType.get(type).substring(0, 10)
+ "] didn't start with " + expectStart, logsByType.get(type).startsWith(expectStart));
}
@Then("parse Yarn container log fetched from HTML {string}")
public void parseYarnContainerLogFetchedFromHTML(String html) {
logsByType = yarnDriverLogFetcherMock.parseLogsFromHtml(html);
}
}

Просмотреть файл

@ -0,0 +1,267 @@
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
<html>
<meta http-equiv="X-UA-Compatible" content="IE=8">
<meta http-equiv="Content-type" content="text/html; charset=UTF-8">
<title>
</title>
<link rel="stylesheet" href="/yarnui/static/yarn.css">
<style type="text/css">
#layout { height: 100%; }
#layout thead td { height: 3em; }
#layout #navcell { width: 11em; padding: 0 1em; }
#layout td.content { padding-top: 0 }
#layout tbody { vertical-align: top; }
#layout tfoot td { height: 4em; }
</style>
<link rel="stylesheet" href="/yarnui/static/jquery/themes-1.9.1/base/jquery-ui.css">
<link rel="stylesheet" href="/yarnui/static/dt-1.9.4/css/jui-dt.css">
<script type="text/javascript" src="/yarnui/static/jquery/jquery-1.8.2.min.js">
</script>
<script type="text/javascript" src="/yarnui/static/jquery/jquery-ui-1.9.1.custom.min.js">
</script>
<script type="text/javascript" src="/yarnui/static/dt-1.9.4/js/jquery.dataTables.min.js">
</script>
<script type="text/javascript" src="/yarnui/static/yarn.dt.plugins.js">
</script>
<script type="text/javascript" src="/yarnui/static/dt-sorting/natural.js">
</script>
<style type="text/css">
#jsnotice { padding: 0.2em; text-align: center; }
.ui-progressbar { height: 1em; min-width: 5em }
</style>
<script type="text/javascript">
$(function() {
$('#nav').accordion({autoHeight:false, active:0});
});
</script>
<div id="jsnotice" class="ui-state-error">
This page will not function without javascript enabled. Please enable javascript on your browser.
</div>
<script type="text/javascript">
$('#jsnotice').hide();
</script>
<table id="layout" class="ui-widget-content">
<thead>
<tr>
<td colspan="2">
<div id="header" class="ui-widget">
<div id="user">
Logged in as: dr.who
</div>
<div id="logo">
<img src="/yarnui/static/hadoop-st.png">
</div>
<h1>
</h1>
</div>
</td>
</tr>
</thead>
<tfoot>
<tr>
<td colspan="2">
<div id="footer" class="ui-widget">
</div>
</td>
</tr>
</tfoot>
<tbody>
<tr>
<td id="navcell">
<div id="nav">
<h3>
Application
</h3>
<ul>
<li>
<a href="/yarnui/jobhistory/about">About</a>
<li>
<a href="/yarnui/jobhistory/app">Jobs</a>
</ul>
<h3>
Tools
</h3>
<ul>
<li>
<a href="/yarnui/conf">Configuration</a>
<li>
<a href="/yarnui/logs">Local logs</a>
<li>
<a href="/yarnui/stacks">Server stacks</a>
<li>
<a href="/yarnui/jmx?qry=Hadoop:*">Server metrics</a>
</ul>
</div>
</td>
<td class="content">
<pre>
</pre>
<p>
Log Type: directory.info
<p>
Log Upload Time: Thu May 09 03:16:31 +0000 2019
<p>
Log Length: 6529
<p>
Showing 4096 bytes of 6529 total. Click
<a href="/yarnui/jobhistory/logs/wn2-zhwe-s.s4un4f0kknqu5fddjqm2rbsvkd.ax.internal.cloudapp.net/port/30050/container_e01_1555654226340_0115_01_000001/container_e01_1555654226340_0115_01_000001/livy/directory.info/?start=0">here</a>
for the full log.
<pre>.template
8652041 4 -r-x------ 1 yarn hadoop 2316 May 9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-client.xml.example
8652018 4 -r-x------ 1 yarn hadoop 1 May 9 03:16 ./__spark_conf__/__hadoop_conf__/dfs.exclude
8652039 4 -r-x------ 1 yarn hadoop 1602 May 9 03:16 ./__spark_conf__/__hadoop_conf__/health_check
8652033 12 -r-x------ 1 yarn hadoop 9083 May 9 03:16 ./__spark_conf__/__hadoop_conf__/hdfs-site.xml
8652022 4 -r-x------ 1 yarn hadoop 744 May 9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-client.xml
8652017 4 -r-x------ 1 yarn hadoop 259 May 9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-metrics2-azure-file-system.properties
8652015 4 -r-x------ 1 yarn hadoop 241 May 9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-metrics2-adl-file-system.properties
8652034 4 -r-x------ 1 yarn hadoop 1020 May 9 03:16 ./__spark_conf__/__hadoop_conf__/commons-logging.properties
8652194 4 -r-x------ 1 yarn hadoop 945 May 9 03:16 ./__spark_conf__/__hadoop_conf__/taskcontroller.cfg
8652037 8 -r-x------ 1 yarn hadoop 4221 May 9 03:16 ./__spark_conf__/__hadoop_conf__/task-log4j.properties
8652193 8 -r-x------ 1 yarn hadoop 4113 May 9 03:16 ./__spark_conf__/__hadoop_conf__/mapred-queues.xml.template
8652040 4 -r-x------ 1 yarn hadoop 1195 May 9 03:16 ./__spark_conf__/__hadoop_conf__/hive-site.xml
8652020 4 -r-x------ 1 yarn hadoop 244 May 9 03:16 ./__spark_conf__/__hadoop_conf__/spark-thrift-fairscheduler.xml
8652011 8 -r-x------ 1 yarn hadoop 5730 May 9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-env.sh
8652031 4 -r-x------ 1 yarn hadoop 1335 May 9 03:16 ./__spark_conf__/__hadoop_conf__/configuration.xsl
8652014 4 -r-x------ 1 yarn hadoop 2830 May 9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-metrics2.properties
8652032 8 -r-x------ 1 yarn hadoop 4302 May 9 03:16 ./__spark_conf__/__hadoop_conf__/yarn-env.sh
8652012 8 -r-x------ 1 yarn hadoop 8070 May 9 03:16 ./__spark_conf__/__hadoop_conf__/mapred-site.xml
8652044 4 -r-x------ 1 yarn hadoop 219 May 9 03:16 ./__spark_conf__/__hadoop_conf__/topology_mappings.data
8652088 4 -r-x------ 1 yarn hadoop 2697 May 9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-server.xml.example
8652045 4 -r-x------ 1 yarn hadoop 1000 May 9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-server.xml
8652013 12 -r-x------ 1 yarn hadoop 10754 May 9 03:16 ./__spark_conf__/__hadoop_conf__/log4j.properties
8652028 4 -r-x------ 1 yarn hadoop 2444 May 9 03:16 ./__spark_conf__/__hadoop_conf__/capacity-scheduler.xml
8652016 28 -r-x------ 1 yarn hadoop 24964 May 9 03:16 ./__spark_conf__/__hadoop_conf__/yarn-site.xml
8652196 4 -r-x------ 1 yarn hadoop 3461 May 9 03:16 ./__spark_conf__/__spark_conf__.properties
8652008 4 -r-x------ 1 yarn hadoop 3635 May 9 03:16 ./__spark_conf__/log4j.properties
8652195 136 -r-x------ 1 yarn hadoop 135194 May 9 03:16 ./__spark_conf__/__spark_hadoop_conf__.xml
8652206 4 -rwx------ 1 yarn hadoop 759 May 9 03:16 ./default_container_executor.sh
8652203 4 -rw-r--r-- 1 yarn hadoop 60 May 9 03:16 ./.launch_container.sh.crc
8652199 4 drwx--x--- 2 yarn hadoop 4096 May 9 03:16 ./tmp
8652207 4 -rw-r--r-- 1 yarn hadoop 16 May 9 03:16 ./.default_container_executor.sh.crc
8652202 8 -rwx------ 1 yarn hadoop 6592 May 9 03:16 ./launch_container.sh
8652200 4 -rw-r--r-- 1 yarn hadoop 69 May 9 03:16 ./container_tokens
8652201 4 -rw-r--r-- 1 yarn hadoop 12 May 9 03:16 ./.container_tokens.crc
broken symlinks(find -L . -maxdepth 5 -type l -ls):
</pre>
<pre>
</pre>
<p>
Log Type: prelaunch.err
<p>
Log Upload Time: Thu May 09 03:16:31 +0000 2019
<p>
Log Length: 0
<pre></pre>
<pre>
</pre>
<p>
Log Type: stdout
<p>
Log Upload Time: Thu May 09 03:16:41 +0000 2019
<p>
Log Length: 83
<pre>(10.10.10.10,&quot;FRED&quot;,GET http://images.com/2013/Generic.jpg HTTP/1.1) bytes=621 n=2
</pre>
<pre>
</pre>
<p>
Log Type: stderr
<p>
Log Upload Time: Thu May 09 03:16:42 +0000 2019
<p>
Log Length: 42950
<p>
Showing 4096 bytes of 42950 total. Click
<a href="/yarnui/jobhistory/logs/wn2-zhwe-s.s4un4f0kknqu5fddjqm2rbsvkd.ax.internal.cloudapp.net/port/30050/container_e01_1555654226340_0115_01_000001/container_e01_1555654226340_0115_01_000001/livy/stderr/?start=0">here</a>
for the full log.
<pre>.0E-4, max=1.713311, mean=0.03784145849472171, stddev=0.24547936947579752, median=0.0015999999999999999, p75=0.002, p95=0.0050999999999999995, p98=1.713311, p99=1.713311, p999=1.713311, mean_rate=7.117206991134792, m1=1.4, m5=1.4, m15=1.4, rate_unit=events/second, duration_unit=milliseconds
19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.status.AppStatusListener, count=44, min=0.0024, max=31.023896999999998, mean=2.4547524526277567, stddev=5.876043399731473, median=0.204202, p75=1.60211, p95=14.707092999999999, p98=31.023896999999998, p99=31.023896999999998, p999=31.023896999999998, mean_rate=5.921080739448543, m1=1.0, m5=1.0, m15=1.0, rate_unit=events/second, duration_unit=milliseconds
19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.appStatus.listenerProcessingTime, count=44, min=0.0235, max=31.059396999999997, mean=2.4949896756423464, stddev=5.875936370492155, median=0.232602, p75=1.6312099999999998, p95=14.777693999999999, p98=31.059396999999997, p99=31.059396999999997, p999=31.059396999999997, mean_rate=5.919504362289072, m1=1.0, m5=1.0, m15=1.0, rate_unit=events/second, duration_unit=milliseconds
19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.eventLog.listenerProcessingTime, count=44, min=0.015, max=68.757137, mean=8.468596610427415, stddev=13.398995984218377, median=1.301309, p75=20.724231, p95=31.845902, p98=68.757137, p99=68.757137, p999=68.757137, mean_rate=7.096692322900792, m1=1.4, m5=1.4, m15=1.4, rate_unit=events/second, duration_unit=milliseconds
19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.executorManagement.listenerProcessingTime, count=44, min=0.015201, max=4.030926, mean=0.2928407880846266, stddev=0.8303335717416928, median=0.0339, p75=0.061001, p95=1.59211, p98=4.030926, p99=4.030926, p999=4.030926, mean_rate=6.589188634560831, m1=1.0, m5=1.0, m15=1.0, rate_unit=events/second, duration_unit=milliseconds
19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.shared.listenerProcessingTime, count=44, min=0.030601, max=95.799408, mean=3.1430500529808088, stddev=14.354981642887365, median=0.355602, p75=0.899106, p95=5.006031, p98=95.799408, p99=95.799408, p999=95.799408, mean_rate=7.112904958727032, m1=1.4, m5=1.4, m15=1.4, rate_unit=events/second, duration_unit=milliseconds
19/05/09 03:16:41 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
19/05/09 03:16:41 INFO YarnClusterSchedulerBackend: Shutting down all executors
19/05/09 03:16:41 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/05/09 03:16:41 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
19/05/09 03:16:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/09 03:16:42 INFO MemoryStore: MemoryStore cleared
19/05/09 03:16:42 INFO BlockManager: BlockManager stopped
19/05/09 03:16:42 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/09 03:16:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/09 03:16:42 INFO SparkContext: Successfully stopped SparkContext
19/05/09 03:16:42 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
19/05/09 03:16:42 INFO ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
19/05/09 03:16:42 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
19/05/09 03:16:42 INFO ApplicationMaster: Deleting staging directory adl://home/user/livy/.sparkStaging/application_1555654226340_0115
19/05/09 03:16:42 INFO ShutdownHookManager: Shutdown hook called
19/05/09 03:16:42 INFO ShutdownHookManager: Deleting directory /mnt/resource/hadoop/yarn/local/usercache/livy/appcache/application_1555654226340_0115/spark-ddcde63e-131e-4470-a79a-4f9c43f7df53
</pre>
<pre>
</pre>
<p>
Log Type: launch_container.sh
<p>
Log Upload Time: Thu May 09 03:16:31 +0000 2019
<p>
Log Length: 6592
<p>
Showing 4096 bytes of 6592 total. Click
<a href="/yarnui/jobhistory/logs/wn2-zhwe-s.s4un4f0kknqu5fddjqm2rbsvkd.ax.internal.cloudapp.net/port/30050/container_e01_1555654226340_0115_01_000001/container_e01_1555654226340_0115_01_000001/livy/launch_container.sh/?start=0">here</a>
for the full log.
<pre>ework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/2.6.5.3006-29/hadoop/lib/hadoop-lzo-0.6.0.2.6.5.3006-29.jar:/etc/hadoop/conf/secure:/usr/lib/hdinsight-datalake/*::/usr/hdp/current/spark2-client/jars/*:/usr/lib/hdinsight-datalake/*:/usr/hdp/current/spark_llap/*:/usr/hdp/current/spark2-client/conf::$PWD/__spark_conf__/__hadoop_conf__&quot;
export HADOOP_TOKEN_FILE_LOCATION=&quot;/mnt/resource/hadoop/yarn/local/usercache/livy/appcache/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/container_tokens&quot;
export NM_AUX_SERVICE_spark_shuffle=&quot;&quot;
export SPARK_USER=&quot;livy&quot;
export LOCAL_USER_DIRS=&quot;/mnt/resource/hadoop/yarn/local/usercache/livy/&quot;
export HADOOP_HOME=&quot;/usr/hdp/2.6.5.3006-29/hadoop&quot;
export HOME=&quot;/home/&quot;
export NM_AUX_SERVICE_spark2_shuffle=&quot;&quot;
export CONTAINER_ID=&quot;container_e01_1555654226340_0115_01_000001&quot;
export MALLOC_ARENA_MAX=&quot;4&quot;
echo &quot;Setting up job resources&quot;
ln -sf &quot;/mnt/resource/hadoop/yarn/local/usercache/livy/filecache/306/__spark_conf__.zip&quot; &quot;__spark_conf__&quot;
ln -sf &quot;/mnt/resource/hadoop/yarn/local/usercache/livy/filecache/307/default_artifact.jar&quot; &quot;__app__.jar&quot;
echo &quot;Copying debugging information&quot;
# Creating copy of launch script
cp &quot;launch_container.sh&quot; &quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/launch_container.sh&quot;
chmod 640 &quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/launch_container.sh&quot;
# Determining directory contents
echo &quot;ls -l:&quot; 1&gt;&quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info&quot;
ls -l 1&gt;&gt;&quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info&quot;
echo &quot;find -L . -maxdepth 5 -ls:&quot; 1&gt;&gt;&quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info&quot;
find -L . -maxdepth 5 -ls 1&gt;&gt;&quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info&quot;
echo &quot;broken symlinks(find -L . -maxdepth 5 -type l -ls):&quot; 1&gt;&gt;&quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info&quot;
find -L . -maxdepth 5 -type l -ls 1&gt;&gt;&quot;/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info&quot;
echo &quot;Launching container&quot;
exec /bin/bash -c &quot;LD_LIBRARY_PATH=&quot;/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64:$LD_LIBRARY_PATH&quot; $JAVA_HOME/bin/java -server -Xmx1024m -Djava.io.tmpdir=$PWD/tmp -Dhdp.version=2.6.5.3006-29 '-Detwlogger.component=sparkdriver' '-DlogFilter.filename=SparkLogFilters.xml' '-DpatternGroup.filename=SparkPatternGroups.xml' '-Dlog4jspark.root.logger=INFO,console,RFA,ETW,Anonymizer' '-Dlog4jspark.log.dir=/var/log/sparkapp/\${user.name}' '-Dlog4jspark.log.file=sparkdriver.log' '-Dlog4j.configuration=file:/usr/hdp/current/spark2-client/conf/log4j.properties' '-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl' '-XX:+UseParallelGC' '-XX:+UseParallelOldGC' -Dspark.yarn.app.container.log.dir=/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'sample.LogQuery' --jar adl://zhwespkwesteu.azuredatalakestore.net/clusters/zhwe-spk23-adlsgen1/SparkSubmission/2019/04/28/4c017893-dd57-4a01-8bf9-593f002cd391/default_artifact.jar --properties-file $PWD/__spark_conf__/__spark_conf__.properties 1&gt; /mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/stdout 2&gt; /mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/stderr&quot;
</pre>
<pre>
</pre>
<p>
Log Type: prelaunch.out
<p>
Log Upload Time: Thu May 09 03:16:31 +0000 2019
<p>
Log Length: 100
<pre>Setting up env variables
Setting up job resources
Copying debugging information
Launching container
</pre>
</td>
</tr>
</tbody>
</table>
</html>

Просмотреть файл

@ -36,3 +36,16 @@ Feature: YarnContainerLogFetcher unit tests
And create a yarn application driver with id application_1513565654634_0011
And setup a mock Yarn service for GET request '/yarnui/ws/v1/cluster/apps/application_1513565654634_0011/appattempts' to return '{"appAttempts":{"appAttempt":[{"id":1,"startTime":1513673984219,"finishedTime":0,"containerId":"container_1513565654634_0011_01_000001","nodeHttpAddress":"10.0.0.6:30060","nodeId":"10.0.0.6:30050","logsLink":"http://10.0.0.6:30060/node/containerlogs/container_1513565654634_0011_01_000001/livy","blacklistedNodes":"","appAttemptId":"appattempt_1513565654634_0011_000001"},{"id":2,"startTime":1513673985219,"finishedTime":0,"containerId":"container_1513565654634_0011_01_000002","nodeHttpAddress":"10.0.0.7:30060","nodeId":"10.0.0.7:30050","logsLink":"","blacklistedNodes":"","appAttemptId":"appattempt_1513565654634_0011_000002"}]}}' with status code 200
Then getting Spark Job driver log URL Observable should be empty
Scenario: parse logs from HTML page
Given prepare a Yarn cluster with Node Manager base URL http://127.0.0.1:$port/yarnui/ws/v1/cluster/apps/ and UI base URL http://127.0.0.1:$port/yarnui/
And create a yarn application driver with id mockId
And parse Yarn container log fetched from HTML page 'YarnContainerLog01.html'
Then check the type log 'directory.info' should start with '.template'
Then check the type log 'stdout' should start with '(10.10.10.10'
Then check the type log 'stderr' should start with '.0E-4, max=1.7133'
Scenario: parse logs from an empty HTML page
Given prepare a Yarn cluster with Node Manager base URL http://127.0.0.1:$port/yarnui/ws/v1/cluster/apps/ and UI base URL http://127.0.0.1:$port/yarnui/
And create a yarn application driver with id mockId
Then parse Yarn container log fetched from HTML ''