diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/HdiSparkBatch.java b/src/main/java/com/microsoft/azure/spark/tools/job/HdiSparkBatch.java index 0727ea8..77fc815 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/HdiSparkBatch.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/HdiSparkBatch.java @@ -46,7 +46,7 @@ public class HdiSparkBatch extends LivySparkBatch implements SparkLogFetcher, De } @Override - public Observable> fetch(final String type, + public Observable fetch(final String type, final long logOffset, final int size) { return getDriverLogFetcherDelegate() diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/SparkLogFetcher.java b/src/main/java/com/microsoft/azure/spark/tools/job/SparkLogFetcher.java index f5f62ed..8b2519d 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/SparkLogFetcher.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/SparkLogFetcher.java @@ -3,9 +3,8 @@ package com.microsoft.azure.spark.tools.job; -import com.microsoft.azure.spark.tools.utils.Pair; import rx.Observable; public interface SparkLogFetcher { - Observable> fetch(String type, long logOffset, int size); + Observable fetch(String type, long logOffset, int size); } \ No newline at end of file diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcher.java b/src/main/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcher.java index 9f312d2..bcd3a52 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcher.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcher.java @@ -3,15 +3,9 @@ package com.microsoft.azure.spark.tools.job; -import com.microsoft.azure.spark.tools.clusters.YarnCluster; -import com.microsoft.azure.spark.tools.http.HttpObservable; -import com.microsoft.azure.spark.tools.log.Logger; -import com.microsoft.azure.spark.tools.restapi.yarn.rm.App; -import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttempt; -import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttemptsResponse; -import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppResponse; -import com.microsoft.azure.spark.tools.utils.Pair; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.text.StrSubstitutor; import org.apache.http.NameValuePair; import org.apache.http.client.methods.HttpGet; import org.apache.http.message.BasicNameValuePair; @@ -20,58 +14,88 @@ import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.jsoup.nodes.Element; import org.jsoup.nodes.Node; +import org.jsoup.select.Elements; import rx.Observable; +import rx.subjects.PublishSubject; + +import com.microsoft.azure.spark.tools.clusters.YarnCluster; +import com.microsoft.azure.spark.tools.http.HttpObservable; +import com.microsoft.azure.spark.tools.http.HttpResponse; +import com.microsoft.azure.spark.tools.log.Logger; +import com.microsoft.azure.spark.tools.restapi.yarn.rm.App; +import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttempt; +import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttemptsResponse; +import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppResponse; +import com.microsoft.azure.spark.tools.utils.Pair; +import com.microsoft.azure.spark.tools.utils.UriUtils; import java.io.IOException; import java.net.URI; import java.net.UnknownServiceException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static rx.exceptions.Exceptions.propagate; /** * The class is to support fetching Spark Driver log from Yarn application UI. */ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger { + private static final Pattern LOG_TYPE_PATTERN = Pattern.compile("Log Type:\\s+(?\\S+)"); + private final HttpObservable http; /** - * A {LogConversionMode} is a type of enum to present Yarn log UI URI combining ways. + * A {LogConversionMode} is an internal class to present Yarn log UI URI combining ways. */ - private enum LogConversionMode { - UNKNOWN, - WITHOUT_PORT, - WITH_PORT, - ORIGINAL; + private class LogConversionMode { + static final String HOST = "HOST"; + static final String PORT = "PORT"; + static final String PATH = "PATH"; // Internal URI path, starting with slash `/` + static final String BASE = "BASE"; // Yarn UI Base URI, ending with slash `/` + static final String ORIGINAL = "ORIGINAL"; // Original internal URI - public static LogConversionMode next(final LogConversionMode current) { - List modes = Arrays.asList(LogConversionMode.values()); + private final String name; + private final String publicPathTemplate; - int found = modes.indexOf(current); - if (found + 1 >= modes.size()) { - throw new NoSuchElementException(); - } else { - return modes.get(found + 1); - } + LogConversionMode(final String name, final String publicPathTemplate) { + this.name = name; + this.publicPathTemplate = publicPathTemplate; + } + + URI toPublic(final URI internalLogUrl) { + final Map values = ImmutableMap.of( + HOST, internalLogUrl.getHost(), + PORT, String.valueOf(internalLogUrl.getPort()), + PATH, Optional.of(internalLogUrl.getPath()).filter(StringUtils::isNoneBlank).orElse("/"), + BASE, UriUtils.normalizeWithSlashEnding(URI.create(getCluster().getYarnUIBaseUrl())).toString(), + ORIGINAL, internalLogUrl.toString()); + final StrSubstitutor sub = new StrSubstitutor(values); + final String publicPath = sub.replace(publicPathTemplate); + + return URI.create(publicPath); } } + private final Iterator logConversionModes = Arrays.asList( + new LogConversionMode("WITHOUT_PORT", "${BASE}${HOST}${PATH}"), + new LogConversionMode("WITH_PORT", "${BASE}${HOST}/port/${PORT}${PATH}"), + new LogConversionMode("ORIGINAL", "${ORIGINAL}") + ).iterator(); + private final URI yarnNMConnectUri; private @Nullable String currentLogUrl; - private LogConversionMode logUriConversionMode = LogConversionMode.UNKNOWN; private final String applicationId; private final YarnCluster cluster; @@ -96,14 +120,6 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger { this.currentLogUrl = currentLogUrl; } - private LogConversionMode getLogUriConversionMode() { - return this.logUriConversionMode; - } - - private void setLogUriConversionMode(final LogConversionMode mode) { - this.logUriConversionMode = mode; - } - /** * Get the current Spark job Yarn application attempt log URI Observable. */ @@ -148,96 +164,49 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger { } private Observable isUriValid(final URI uriProbe) { - return getHttp() - .request(new HttpGet(uriProbe), null, null, null) - .map(response -> response.getStatusLine().getStatusCode() < 300); - } - - private Optional convertToPublicLogUri(final LogConversionMode mode, final URI internalLogUrl) { - String normalizedPath = Optional.of(internalLogUrl.getPath()).filter(StringUtils::isNoneBlank) - .orElse("/"); - URI yarnUiBase = URI.create(getCluster().getYarnUIBaseUrl() - + (getCluster().getYarnUIBaseUrl().endsWith("/") ? "" : "/")); - - switch (mode) { - case UNKNOWN: - return Optional.empty(); - case WITHOUT_PORT: - return Optional.of(yarnUiBase.resolve(String.format("%s%s", internalLogUrl.getHost(), normalizedPath))); - case WITH_PORT: - return Optional.of(yarnUiBase.resolve(String.format("%s/port/%s%s", - internalLogUrl.getHost(), internalLogUrl.getPort(), normalizedPath))); - case ORIGINAL: - return Optional.of(internalLogUrl); - default: - throw new AssertionError("Unknown LogConversionMode, shouldn't be reached"); - } + return getRequest(uriProbe) + .map(any -> true) + .onErrorReturn(err -> false); } private Observable convertToPublicLogUri(final URI internalLogUri) { - // New version, without port info in log URL - return this.convertToPublicLogUri(this.getLogUriConversionMode(), internalLogUri) - .map(Observable::just) - .orElseGet(() -> { - // Probe usable log URI - LogConversionMode probeMode = YarnContainerLogFetcher.this.getLogUriConversionMode(); + while (this.logConversionModes.hasNext()) { + // Try next mode + final LogConversionMode probeMode = this.logConversionModes.next(); + final URI uriProbe = probeMode.toPublic(internalLogUri); - boolean isNoMoreTry = false; - while (!isNoMoreTry) { - Optional uri = this.convertToPublicLogUri(probeMode, internalLogUri) - .filter(uriProbe -> isUriValid(uriProbe).toBlocking().firstOrDefault(false)); + if (isUriValid(uriProbe).toBlocking().firstOrDefault(false)) { + // Find usable one + log().debug("The Yarn log URL conversion mode is {} with pattern {}", + probeMode.name, probeMode.publicPathTemplate); - if (uri.isPresent()) { - // Find usable one - YarnContainerLogFetcher.this.setLogUriConversionMode(probeMode); - return Observable.just(uri.get()); - } + return Observable.just(uriProbe); + } + } - try { - probeMode = LogConversionMode.next(probeMode); - } catch (NoSuchElementException ignore) { - log().warn("Can't find conversion mode of Yarn " + getYarnNMConnectUri()); - isNoMoreTry = true; - } - } - - // All modes were probed and all failed - return Observable.empty(); - }); + // All modes were probed and all failed + log().warn("Can't find conversion mode of Yarn " + getYarnNMConnectUri()); + return Observable.empty(); } @Override - public Observable> fetch(final String type, final long logOffset, final int size) { + public Observable fetch(final String type, final long logOffset, final int size) { return this.getSparkJobDriverLogUrl() .map(Object::toString) .flatMap(logUrl -> { - long offset = logOffset; + final long offset = StringUtils.equalsIgnoreCase(logUrl, this.getCurrentLogUrl()) + ? logOffset + : 0L; - if (!StringUtils.equalsIgnoreCase(logUrl, this.getCurrentLogUrl())) { - this.setCurrentLogUrl(logUrl); - offset = 0L; - } + this.setCurrentLogUrl(logUrl); - String probedLogUrl = this.getCurrentLogUrl(); - if (probedLogUrl == null) { - return Observable.empty(); - } - - String logGot = this.getInformationFromYarnLogDom(probedLogUrl, type, offset, size); - - if (StringUtils.isEmpty(logGot)) { - return getYarnApplicationRequest() - .flatMap(app -> { - if (isLogFetchable(app.getState())) { - return Observable.empty(); - } else { - return Observable.just(Pair.of("", -1L)); - } - }); - } else { - return Observable.just(Pair.of(logGot, offset)); - } - }); + return this.getContentFromYarnLogDom(logUrl, type, offset, size); + }) + .repeatWhen(completed -> getYarnApplicationRequest() + .filter(app -> isLogFetchable(app.getState())) + .flatMap(app -> completed) + .delay(1, TimeUnit.SECONDS)) + .first(); } private boolean isLogFetchable(final String status) { @@ -292,87 +261,77 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger { } } - private String getInformationFromYarnLogDom(final String baseUrl, - final String type, - final long start, - final int size) { - URI url = URI.create(StringUtils.stripEnd(baseUrl, "/") + "/").resolve( - String.format("%s?start=%d", type, start) + (size <= 0 ? "" : String.format("&&end=%d", start + size))); + private Observable getContentFromYarnLogDom(final String baseUrl, + final String type, + final long start, + final int size) { + final URI url = UriUtils.normalizeWithSlashEnding(baseUrl).resolve(type); - List params = new ArrayList<>(); + final List params = new ArrayList<>(); params.add(new BasicNameValuePair("start", Long.toString(start))); if (size > 0) { params.add(new BasicNameValuePair("size", Long.toString(size))); } - String typedLogs = getHttp() - .requestWithHttpResponse(new HttpGet(url), null, params, null) - .doOnNext(response -> { - try { - log().debug("Fetch log from " + url + ", got " + response.getCode() - + " with " + response.getMessage()); - } catch (IOException ignored) { - // The upstream requestWithHttpResponse() has already get message buffered. - } - }) + return getRequest(url, params) .map(response -> { - - Document doc = null; try { - doc = Jsoup.parse(response.getMessage()); + return response.getMessage(); } catch (IOException ignored) { // The upstream requestWithHttpResponse() has already get message buffered. - throw new AssertionError("The upstream has got messages."); + throw propagate(new AssertionError("The upstream has got messages.")); } - - Iterator iterator = Optional.ofNullable(doc.getElementById("navcell")) - .map(Element::nextElementSibling) - .map(Element::children) - .map(ArrayList::iterator) - .orElse(Collections.emptyIterator()); - - HashMap logTypeMap = new HashMap<>(); - final AtomicReference logType = new AtomicReference<>(); - String logs = ""; - - while (iterator.hasNext()) { - Element node = iterator.next(); - - if (StringUtils.equalsIgnoreCase(node.tagName(), "p")) { - // In history server, need to read log type paragraph in page - final Pattern logTypePattern = Pattern.compile("Log Type:\\s+(\\S+)"); - - node.childNodes().stream() - .findFirst() - .map(Node::toString) - .map(StringUtils::trim) - .map(logTypePattern::matcher) - .filter(Matcher::matches) - .map(matcher -> matcher.group(1)) - .ifPresent(logType::set); - } else if (StringUtils.equalsIgnoreCase(node.tagName(), "pre")) { - // In running, no log type paragraph in page - logs = node.childNodes().stream() - .findFirst() - .map(Node::toString) - .orElse(""); - - if (logType.get() != null) { - // Only get the first
...
- logTypeMap.put(logType.get(), logs); - - logType.set(null); - } - } - } - - return logTypeMap.getOrDefault(type, logs); }) - .doOnError(err -> log().warn("Can't parse information from YarnUI log page " + url, err)) - .toBlocking() - .firstOrDefault(""); + .flatMap(html -> { + final String logs = parseLogsFromHtml(html).getOrDefault(type, StringUtils.EMPTY); - return typedLogs; + return StringUtils.isEmpty(logs) ? Observable.empty() : Observable.just(logs); + }) + .doOnError(err -> log().warn("Can't parse information from YarnUI log page " + url, err)); + } + + Map parseLogsFromHtml(final String webPage) { + final Document doc = Jsoup.parse(webPage); + final Elements elements = Optional.ofNullable(doc.getElementById("navcell")) + .map(Element::nextElementSibling) + .map(Element::children) + .orElse(null); + + if (elements == null) { + return emptyMap(); + } + + // Subject for log type found + final PublishSubject logTypeWindowOpenings = PublishSubject.create(); + + return Observable.from(elements) + .doOnNext(node -> { + if (StringUtils.equalsIgnoreCase(node.tagName(), "p")) { + // In history server, need to read log type paragraph in page + node.childNodes().stream() + .findFirst() + .map(this::findLogTypeDomNode) + .ifPresent(logTypeWindowOpenings::onNext); + } + }) + .withLatestFrom(logTypeWindowOpenings, Pair::of) + .filter(nodeWithType -> + // The log content starts from `pre` + StringUtils.equalsIgnoreCase(nodeWithType.getFirst().tagName(), "pre") + && !nodeWithType.getFirst().childNodes().isEmpty()) + // Only take the first `
` element
+                .doOnNext(nodeWithType -> logTypeWindowOpenings.onNext(nodeWithType.getRight() + "-ending"))
+                .toMap(Pair::getSecond, nodeWithType ->
+                        // Get the content as log
+                        String.valueOf(nodeWithType.getFirst().childNodes().get(0)))
+                .toBlocking()
+                .firstOrDefault(emptyMap());
+    }
+
+    private @Nullable String findLogTypeDomNode(Node node) {
+        final Matcher matcher = LOG_TYPE_PATTERN.matcher(node.toString().trim());
+
+        return matcher.matches() ? matcher.group("type") : null;
     }
 
     public final String getApplicationId() {
@@ -396,7 +355,7 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
 
         return getHttp()
 //                .withUuidUserAgent()
-                .get(uri.toString(), null, null, AppResponse.class)
+                .get(uri.toString(), emptyList(), emptyList(), AppResponse.class)
                 .map(Pair::getFirst)
                 .map(AppResponse::getApp);
     }
@@ -406,8 +365,25 @@ public class YarnContainerLogFetcher implements SparkLogFetcher, Logger {
 
         return getHttp()
 //                .withUuidUserAgent()
-                .get(uri.toString(), null, null, AppAttemptsResponse.class)
+                .get(uri.toString(), emptyList(), emptyList(), AppAttemptsResponse.class)
                 .map(Pair::getFirst)
                 .map(appAttemptsResponse -> appAttemptsResponse.getAppAttempts().appAttempt);
     }
+
+    private Observable getRequest(URI url, List params) {
+        return getHttp()
+                .requestWithHttpResponse(new HttpGet(url), null, params, emptyList())
+                .doOnNext(response -> {
+                    try {
+                        log().debug("Get page from " + url + ", got " + response.getCode()
+                                + " with " + response.getMessage());
+                    } catch (IOException ignored) {
+                        // The upstream requestWithHttpResponse() has already get message buffered.
+                    }
+                });
+    }
+
+    private Observable getRequest(URI url) {
+        return getRequest(url, emptyList());
+    }
 }
diff --git a/src/main/java/com/microsoft/azure/spark/tools/processes/SparkJobLogInputStream.java b/src/main/java/com/microsoft/azure/spark/tools/processes/SparkJobLogInputStream.java
index c523e14..476b9e9 100644
--- a/src/main/java/com/microsoft/azure/spark/tools/processes/SparkJobLogInputStream.java
+++ b/src/main/java/com/microsoft/azure/spark/tools/processes/SparkJobLogInputStream.java
@@ -6,15 +6,11 @@ package com.microsoft.azure.spark.tools.processes;
 
 import com.microsoft.azure.spark.tools.job.SparkLogFetcher;
 import com.microsoft.azure.spark.tools.utils.LaterInit;
-import com.microsoft.azure.spark.tools.utils.Pair;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.NoSuchElementException;
 
-import static java.lang.Thread.sleep;
-
 public class SparkJobLogInputStream extends InputStream {
     private String logType;
     private LaterInit sparkLogFetcher = new LaterInit<>();
@@ -35,24 +31,6 @@ public class SparkJobLogInputStream extends InputStream {
         return fetcher;
     }
 
-    private synchronized @Nullable Pair fetchLog(final long logOffset, final int fetchSize) {
-        try {
-            if (!isClosed && sparkLogFetcher.isInitialized()) {
-                return getAttachedLogFetcher()
-                        .fetch(getLogType(), logOffset, fetchSize)
-                        .toBlocking()
-                        .first();
-            }
-        } catch (NoSuchElementException ignored) {
-        }
-
-        return null;
-    }
-
-    public SparkLogFetcher getAttachedLogFetcher() {
-        return sparkLogFetcher.get();
-    }
-
     @Override
     public int read() throws IOException {
         if (isClosed) {
@@ -60,55 +38,23 @@ public class SparkJobLogInputStream extends InputStream {
         }
 
         if (bufferPos >= buffer.length) {
-            // throw new IOException("Beyond the buffer end, needs a new log fetch");
-            int avail;
+            try {
+                final String logSlice = sparkLogFetcher.observable()
+                        .flatMap(fetcher -> fetcher.fetch(getLogType(), offset, -1))
+                        .toBlocking()
+                        .first();
 
-            do {
-                avail = available();
-
-                if (avail == -1) {
-                    return -1;
-                }
-
-                try {
-                    sleep(1000);
-                } catch (InterruptedException ignore) {
-                    return -1;
-                }
-
-            } while (avail == 0);
+                buffer = logSlice.getBytes();
+                bufferPos = 0;
+                offset += logSlice.length();
+            } catch (NoSuchElementException ignored) {
+                return -1;
+            }
         }
 
         return buffer[bufferPos++];
     }
 
-    @Override
-    public int available() throws IOException {
-        if (isClosed) {
-            return -1;
-        }
-
-        if (bufferPos >= buffer.length) {
-            Pair sliceOffsetPair = fetchLog(offset, -1);
-
-            if (sliceOffsetPair == null) {
-                return 0;
-            }
-
-            if (sliceOffsetPair.getValue() == -1L) {
-                return -1;
-            }
-
-            buffer = sliceOffsetPair.getKey().getBytes();
-            bufferPos = 0;
-            offset = sliceOffsetPair.getValue() + sliceOffsetPair.getKey().length();
-
-            return buffer.length;
-        } else {
-            return buffer.length - bufferPos;
-        }
-    }
-
     public String getLogType() {
         return logType;
     }
diff --git a/src/main/java/com/microsoft/azure/spark/tools/utils/UriUtils.java b/src/main/java/com/microsoft/azure/spark/tools/utils/UriUtils.java
new file mode 100644
index 0000000..d945c78
--- /dev/null
+++ b/src/main/java/com/microsoft/azure/spark/tools/utils/UriUtils.java
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT license.
+
+package com.microsoft.azure.spark.tools.utils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class UriUtils {
+    public static URI normalizeWithSlashEnding(final URI src) {
+        try {
+            return src.getPath().endsWith("/")
+                    ? src
+                    : new URI(
+                            src.getScheme(),
+                            src.getAuthority(),
+                            src.getPath() + "/",
+                            src.getQuery(),
+                            src.getFragment());
+        } catch (URISyntaxException x) {
+            throw new IllegalArgumentException(x.getMessage(), x);
+        }
+    }
+
+    public static URI normalizeWithSlashEnding(final String src) {
+        return normalizeWithSlashEnding(URI.create(src));
+    }
+}
diff --git a/src/test/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.java b/src/test/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.java
index ad5f65f..1a548ff 100644
--- a/src/test/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.java
+++ b/src/test/java/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.java
@@ -12,11 +12,17 @@ import cucumber.api.java.After;
 import cucumber.api.java.Before;
 import cucumber.api.java.en.Given;
 import cucumber.api.java.en.Then;
+import org.apache.commons.io.IOUtils;
 import uk.org.lidalia.slf4jtest.TestLogger;
 import uk.org.lidalia.slf4jtest.TestLoggerFactory;
 
+import java.io.File;
+import java.io.InputStream;
 import java.net.URI;
+import java.util.Collections;
+import java.util.Map;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
@@ -28,6 +34,7 @@ public class YarnContainerLogFetcherScenario {
     private YarnCluster yarnClusterMock;
     private YarnContainerLogFetcher yarnDriverLogFetcherMock;
     private TestLogger logger = TestLoggerFactory.getTestLogger(YarnContainerLogFetcher.class);
+    private Map logsByType = Collections.emptyMap();
 
     @Before("@YarnContainerLogFetcherScenario")
     public void setUp() throws Throwable {
@@ -102,4 +109,28 @@ public class YarnContainerLogFetcherScenario {
     public void gettingSparkJobDriverLogURLObservableShouldBeEmpty() throws Throwable {
         assertTrue(yarnDriverLogFetcherMock.getSparkJobDriverLogUrl().isEmpty().toBlocking().last());
     }
+
+    @Given("parse Yarn container log fetched from HTML page {string}")
+    public void parseYarnContainerLogFetchedFromHTMLPage(String webPageFileName) throws Throwable {
+        InputStream pageFileInput = getClass().getClassLoader().getResourceAsStream(
+                getClass().getPackage().getName().replace('.', File.separatorChar)
+                        + File.separator + webPageFileName);
+
+        String html = IOUtils.toString(pageFileInput, UTF_8);
+
+        logsByType = yarnDriverLogFetcherMock.parseLogsFromHtml(html);
+    }
+
+    @Then("check the type log {string} should start with {string}")
+    public void checkTheTypeLogDirectoryInfoShouldStartWithTemplate(String type, String expectStart) {
+        assertTrue("No such type log: " + type, logsByType.containsKey(type));
+        assertTrue("The type " + type + " log [" + logsByType.get(type).substring(0, 10)
+                + "] didn't start with " + expectStart, logsByType.get(type).startsWith(expectStart));
+
+    }
+
+    @Then("parse Yarn container log fetched from HTML {string}")
+    public void parseYarnContainerLogFetchedFromHTML(String html) {
+        logsByType = yarnDriverLogFetcherMock.parseLogsFromHtml(html);
+    }
 }
diff --git a/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLog01.html b/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLog01.html
new file mode 100644
index 0000000..e84246a
--- /dev/null
+++ b/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLog01.html
@@ -0,0 +1,267 @@
+
+
+  
+  
+  
+    
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+ This page will not function without javascript enabled. Please enable javascript on your browser. +
+ + + + + + + + + + + + + + + + + + +
+ +
+ +
+
+
+
+

+ Log Type: directory.info +

+ Log Upload Time: Thu May 09 03:16:31 +0000 2019 +

+ Log Length: 6529 +

+ Showing 4096 bytes of 6529 total. Click + here + for the full log. +

.template
+  8652041      4 -r-x------   1 yarn     hadoop       2316 May  9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-client.xml.example
+  8652018      4 -r-x------   1 yarn     hadoop          1 May  9 03:16 ./__spark_conf__/__hadoop_conf__/dfs.exclude
+  8652039      4 -r-x------   1 yarn     hadoop       1602 May  9 03:16 ./__spark_conf__/__hadoop_conf__/health_check
+  8652033     12 -r-x------   1 yarn     hadoop       9083 May  9 03:16 ./__spark_conf__/__hadoop_conf__/hdfs-site.xml
+  8652022      4 -r-x------   1 yarn     hadoop        744 May  9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-client.xml
+  8652017      4 -r-x------   1 yarn     hadoop        259 May  9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-metrics2-azure-file-system.properties
+  8652015      4 -r-x------   1 yarn     hadoop        241 May  9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-metrics2-adl-file-system.properties
+  8652034      4 -r-x------   1 yarn     hadoop       1020 May  9 03:16 ./__spark_conf__/__hadoop_conf__/commons-logging.properties
+  8652194      4 -r-x------   1 yarn     hadoop        945 May  9 03:16 ./__spark_conf__/__hadoop_conf__/taskcontroller.cfg
+  8652037      8 -r-x------   1 yarn     hadoop       4221 May  9 03:16 ./__spark_conf__/__hadoop_conf__/task-log4j.properties
+  8652193      8 -r-x------   1 yarn     hadoop       4113 May  9 03:16 ./__spark_conf__/__hadoop_conf__/mapred-queues.xml.template
+  8652040      4 -r-x------   1 yarn     hadoop       1195 May  9 03:16 ./__spark_conf__/__hadoop_conf__/hive-site.xml
+  8652020      4 -r-x------   1 yarn     hadoop        244 May  9 03:16 ./__spark_conf__/__hadoop_conf__/spark-thrift-fairscheduler.xml
+  8652011      8 -r-x------   1 yarn     hadoop       5730 May  9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-env.sh
+  8652031      4 -r-x------   1 yarn     hadoop       1335 May  9 03:16 ./__spark_conf__/__hadoop_conf__/configuration.xsl
+  8652014      4 -r-x------   1 yarn     hadoop       2830 May  9 03:16 ./__spark_conf__/__hadoop_conf__/hadoop-metrics2.properties
+  8652032      8 -r-x------   1 yarn     hadoop       4302 May  9 03:16 ./__spark_conf__/__hadoop_conf__/yarn-env.sh
+  8652012      8 -r-x------   1 yarn     hadoop       8070 May  9 03:16 ./__spark_conf__/__hadoop_conf__/mapred-site.xml
+  8652044      4 -r-x------   1 yarn     hadoop        219 May  9 03:16 ./__spark_conf__/__hadoop_conf__/topology_mappings.data
+  8652088      4 -r-x------   1 yarn     hadoop       2697 May  9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-server.xml.example
+  8652045      4 -r-x------   1 yarn     hadoop       1000 May  9 03:16 ./__spark_conf__/__hadoop_conf__/ssl-server.xml
+  8652013     12 -r-x------   1 yarn     hadoop      10754 May  9 03:16 ./__spark_conf__/__hadoop_conf__/log4j.properties
+  8652028      4 -r-x------   1 yarn     hadoop       2444 May  9 03:16 ./__spark_conf__/__hadoop_conf__/capacity-scheduler.xml
+  8652016     28 -r-x------   1 yarn     hadoop      24964 May  9 03:16 ./__spark_conf__/__hadoop_conf__/yarn-site.xml
+  8652196      4 -r-x------   1 yarn     hadoop       3461 May  9 03:16 ./__spark_conf__/__spark_conf__.properties
+  8652008      4 -r-x------   1 yarn     hadoop       3635 May  9 03:16 ./__spark_conf__/log4j.properties
+  8652195    136 -r-x------   1 yarn     hadoop     135194 May  9 03:16 ./__spark_conf__/__spark_hadoop_conf__.xml
+  8652206      4 -rwx------   1 yarn     hadoop        759 May  9 03:16 ./default_container_executor.sh
+  8652203      4 -rw-r--r--   1 yarn     hadoop         60 May  9 03:16 ./.launch_container.sh.crc
+  8652199      4 drwx--x---   2 yarn     hadoop       4096 May  9 03:16 ./tmp
+  8652207      4 -rw-r--r--   1 yarn     hadoop         16 May  9 03:16 ./.default_container_executor.sh.crc
+  8652202      8 -rwx------   1 yarn     hadoop       6592 May  9 03:16 ./launch_container.sh
+  8652200      4 -rw-r--r--   1 yarn     hadoop         69 May  9 03:16 ./container_tokens
+  8652201      4 -rw-r--r--   1 yarn     hadoop         12 May  9 03:16 ./.container_tokens.crc
+broken symlinks(find -L . -maxdepth 5 -type l -ls):
+
+
+
+
+

+ Log Type: prelaunch.err +

+ Log Upload Time: Thu May 09 03:16:31 +0000 2019 +

+ Log Length: 0 +


+          
+
+
+

+ Log Type: stdout +

+ Log Upload Time: Thu May 09 03:16:41 +0000 2019 +

+ Log Length: 83 +

(10.10.10.10,"FRED",GET http://images.com/2013/Generic.jpg HTTP/1.1)	bytes=621	n=2
+
+
+
+
+

+ Log Type: stderr +

+ Log Upload Time: Thu May 09 03:16:42 +0000 2019 +

+ Log Length: 42950 +

+ Showing 4096 bytes of 42950 total. Click + here + for the full log. +

.0E-4, max=1.713311, mean=0.03784145849472171, stddev=0.24547936947579752, median=0.0015999999999999999, p75=0.002, p95=0.0050999999999999995, p98=1.713311, p99=1.713311, p999=1.713311, mean_rate=7.117206991134792, m1=1.4, m5=1.4, m15=1.4, rate_unit=events/second, duration_unit=milliseconds
+19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.listenerProcessingTime.org.apache.spark.status.AppStatusListener, count=44, min=0.0024, max=31.023896999999998, mean=2.4547524526277567, stddev=5.876043399731473, median=0.204202, p75=1.60211, p95=14.707092999999999, p98=31.023896999999998, p99=31.023896999999998, p999=31.023896999999998, mean_rate=5.921080739448543, m1=1.0, m5=1.0, m15=1.0, rate_unit=events/second, duration_unit=milliseconds
+19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.appStatus.listenerProcessingTime, count=44, min=0.0235, max=31.059396999999997, mean=2.4949896756423464, stddev=5.875936370492155, median=0.232602, p75=1.6312099999999998, p95=14.777693999999999, p98=31.059396999999997, p99=31.059396999999997, p999=31.059396999999997, mean_rate=5.919504362289072, m1=1.0, m5=1.0, m15=1.0, rate_unit=events/second, duration_unit=milliseconds
+19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.eventLog.listenerProcessingTime, count=44, min=0.015, max=68.757137, mean=8.468596610427415, stddev=13.398995984218377, median=1.301309, p75=20.724231, p95=31.845902, p98=68.757137, p99=68.757137, p999=68.757137, mean_rate=7.096692322900792, m1=1.4, m5=1.4, m15=1.4, rate_unit=events/second, duration_unit=milliseconds
+19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.executorManagement.listenerProcessingTime, count=44, min=0.015201, max=4.030926, mean=0.2928407880846266, stddev=0.8303335717416928, median=0.0339, p75=0.061001, p95=1.59211, p98=4.030926, p99=4.030926, p999=4.030926, mean_rate=6.589188634560831, m1=1.0, m5=1.0, m15=1.0, rate_unit=events/second, duration_unit=milliseconds
+19/05/09 03:16:41 INFO metrics: type=TIMER, name=application_1555654226340_0115.driver.LiveListenerBus.queue.shared.listenerProcessingTime, count=44, min=0.030601, max=95.799408, mean=3.1430500529808088, stddev=14.354981642887365, median=0.355602, p75=0.899106, p95=5.006031, p98=95.799408, p99=95.799408, p999=95.799408, mean_rate=7.112904958727032, m1=1.4, m5=1.4, m15=1.4, rate_unit=events/second, duration_unit=milliseconds
+19/05/09 03:16:41 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
+19/05/09 03:16:41 INFO YarnClusterSchedulerBackend: Shutting down all executors
+19/05/09 03:16:41 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
+19/05/09 03:16:41 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
+(serviceOption=None,
+ services=List(),
+ started=false)
+19/05/09 03:16:41 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
+19/05/09 03:16:42 INFO MemoryStore: MemoryStore cleared
+19/05/09 03:16:42 INFO BlockManager: BlockManager stopped
+19/05/09 03:16:42 INFO BlockManagerMaster: BlockManagerMaster stopped
+19/05/09 03:16:42 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
+19/05/09 03:16:42 INFO SparkContext: Successfully stopped SparkContext
+19/05/09 03:16:42 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
+19/05/09 03:16:42 INFO ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
+19/05/09 03:16:42 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
+19/05/09 03:16:42 INFO ApplicationMaster: Deleting staging directory adl://home/user/livy/.sparkStaging/application_1555654226340_0115
+19/05/09 03:16:42 INFO ShutdownHookManager: Shutdown hook called
+19/05/09 03:16:42 INFO ShutdownHookManager: Deleting directory /mnt/resource/hadoop/yarn/local/usercache/livy/appcache/application_1555654226340_0115/spark-ddcde63e-131e-4470-a79a-4f9c43f7df53
+
+
+
+
+

+ Log Type: launch_container.sh +

+ Log Upload Time: Thu May 09 03:16:31 +0000 2019 +

+ Log Length: 6592 +

+ Showing 4096 bytes of 6592 total. Click + here + for the full log. +

ework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/2.6.5.3006-29/hadoop/lib/hadoop-lzo-0.6.0.2.6.5.3006-29.jar:/etc/hadoop/conf/secure:/usr/lib/hdinsight-datalake/*::/usr/hdp/current/spark2-client/jars/*:/usr/lib/hdinsight-datalake/*:/usr/hdp/current/spark_llap/*:/usr/hdp/current/spark2-client/conf::$PWD/__spark_conf__/__hadoop_conf__"
+export HADOOP_TOKEN_FILE_LOCATION="/mnt/resource/hadoop/yarn/local/usercache/livy/appcache/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/container_tokens"
+export NM_AUX_SERVICE_spark_shuffle=""
+export SPARK_USER="livy"
+export LOCAL_USER_DIRS="/mnt/resource/hadoop/yarn/local/usercache/livy/"
+export HADOOP_HOME="/usr/hdp/2.6.5.3006-29/hadoop"
+export HOME="/home/"
+export NM_AUX_SERVICE_spark2_shuffle=""
+export CONTAINER_ID="container_e01_1555654226340_0115_01_000001"
+export MALLOC_ARENA_MAX="4"
+echo "Setting up job resources"
+ln -sf "/mnt/resource/hadoop/yarn/local/usercache/livy/filecache/306/__spark_conf__.zip" "__spark_conf__"
+ln -sf "/mnt/resource/hadoop/yarn/local/usercache/livy/filecache/307/default_artifact.jar" "__app__.jar"
+echo "Copying debugging information"
+# Creating copy of launch script
+cp "launch_container.sh" "/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/launch_container.sh"
+chmod 640 "/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/launch_container.sh"
+# Determining directory contents
+echo "ls -l:" 1>"/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info"
+ls -l 1>>"/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info"
+echo "find -L . -maxdepth 5 -ls:" 1>>"/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info"
+find -L . -maxdepth 5 -ls 1>>"/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info"
+echo "broken symlinks(find -L . -maxdepth 5 -type l -ls):" 1>>"/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info"
+find -L . -maxdepth 5 -type l -ls 1>>"/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/directory.info"
+echo "Launching container"
+exec /bin/bash -c "LD_LIBRARY_PATH="/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64:$LD_LIBRARY_PATH" $JAVA_HOME/bin/java -server -Xmx1024m -Djava.io.tmpdir=$PWD/tmp -Dhdp.version=2.6.5.3006-29 '-Detwlogger.component=sparkdriver' '-DlogFilter.filename=SparkLogFilters.xml' '-DpatternGroup.filename=SparkPatternGroups.xml' '-Dlog4jspark.root.logger=INFO,console,RFA,ETW,Anonymizer' '-Dlog4jspark.log.dir=/var/log/sparkapp/\${user.name}' '-Dlog4jspark.log.file=sparkdriver.log' '-Dlog4j.configuration=file:/usr/hdp/current/spark2-client/conf/log4j.properties' '-Djavax.xml.parsers.SAXParserFactory=com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl' '-XX:+UseParallelGC' '-XX:+UseParallelOldGC' -Dspark.yarn.app.container.log.dir=/mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001 org.apache.spark.deploy.yarn.ApplicationMaster --class 'sample.LogQuery' --jar adl://zhwespkwesteu.azuredatalakestore.net/clusters/zhwe-spk23-adlsgen1/SparkSubmission/2019/04/28/4c017893-dd57-4a01-8bf9-593f002cd391/default_artifact.jar --properties-file $PWD/__spark_conf__/__spark_conf__.properties 1> /mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/stdout 2> /mnt/resource/hadoop/yarn/log/application_1555654226340_0115/container_e01_1555654226340_0115_01_000001/stderr"
+
+
+
+
+

+ Log Type: prelaunch.out +

+ Log Upload Time: Thu May 09 03:16:31 +0000 2019 +

+ Log Length: 100 +

Setting up env variables
+Setting up job resources
+Copying debugging information
+Launching container
+
+
+ diff --git a/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.feature b/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.feature index cec9485..0b6f8b8 100644 --- a/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.feature +++ b/src/test/resources/com/microsoft/azure/spark/tools/job/YarnContainerLogFetcherScenario.feature @@ -36,3 +36,16 @@ Feature: YarnContainerLogFetcher unit tests And create a yarn application driver with id application_1513565654634_0011 And setup a mock Yarn service for GET request '/yarnui/ws/v1/cluster/apps/application_1513565654634_0011/appattempts' to return '{"appAttempts":{"appAttempt":[{"id":1,"startTime":1513673984219,"finishedTime":0,"containerId":"container_1513565654634_0011_01_000001","nodeHttpAddress":"10.0.0.6:30060","nodeId":"10.0.0.6:30050","logsLink":"http://10.0.0.6:30060/node/containerlogs/container_1513565654634_0011_01_000001/livy","blacklistedNodes":"","appAttemptId":"appattempt_1513565654634_0011_000001"},{"id":2,"startTime":1513673985219,"finishedTime":0,"containerId":"container_1513565654634_0011_01_000002","nodeHttpAddress":"10.0.0.7:30060","nodeId":"10.0.0.7:30050","logsLink":"","blacklistedNodes":"","appAttemptId":"appattempt_1513565654634_0011_000002"}]}}' with status code 200 Then getting Spark Job driver log URL Observable should be empty + + Scenario: parse logs from HTML page + Given prepare a Yarn cluster with Node Manager base URL http://127.0.0.1:$port/yarnui/ws/v1/cluster/apps/ and UI base URL http://127.0.0.1:$port/yarnui/ + And create a yarn application driver with id mockId + And parse Yarn container log fetched from HTML page 'YarnContainerLog01.html' + Then check the type log 'directory.info' should start with '.template' + Then check the type log 'stdout' should start with '(10.10.10.10' + Then check the type log 'stderr' should start with '.0E-4, max=1.7133' + + Scenario: parse logs from an empty HTML page + Given prepare a Yarn cluster with Node Manager base URL http://127.0.0.1:$port/yarnui/ws/v1/cluster/apps/ and UI base URL http://127.0.0.1:$port/yarnui/ + And create a yarn application driver with id mockId + Then parse Yarn container log fetched from HTML ''