1. Enable Coding Style check and fix style errors.
2. Enable Java header check and fix copyright and license headers
This commit is contained in:
Wei Zhang 2019-04-03 10:19:50 +08:00
Родитель 28c66e9f7c
Коммит 6174155fdf
44 изменённых файлов: 4452 добавлений и 0 удалений

232
checkstyle.xml Normal file
Просмотреть файл

@ -0,0 +1,232 @@
<?xml version="1.0"?>
<!DOCTYPE module PUBLIC
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
"https://checkstyle.org/dtds/configuration_1_3.dtd">
<!--
Checkstyle configuration that checks the coding conventions from:
- the Sun Code Conventions at https://www.oracle.com/technetwork/java/codeconvtoc-136057.html
- the Javadoc guidelines at
https://www.oracle.com/technetwork/java/javase/documentation/index-137868.html
- the Google Java Style at https://google.github.io/styleguide/javaguide.html
- some best practices
Checkstyle is very configurable. Be sure to read the documentation at
http://checkstyle.sourceforge.net (or in your downloaded distribution).
Most Checks are configurable, be sure to consult the documentation.
To completely disable a check, just comment it out or delete it from the file.
-->
<module name="Checker">
<!--
If you set the basedir property below, then all reported file
names will be relative to the specified directory. See
https://checkstyle.org/5.x/config.html#Checker
<property name="basedir" value="${basedir}"/>
-->
<property name="charset" value="UTF-8"/>
<property name="fileExtensions" value="java, properties, xml"/>
<module name="Header">
<property name="headerFile" value="conf/java.header"/>
<property name="ignoreLines" value="2, 3, 4"/>
<property name="fileExtensions" value="java"/>
</module>
<!-- Excludes all 'module-info.java' files -->
<!-- See https://checkstyle.org/config_filefilters.html -->
<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern" value="module\-info\.java$"/>
</module>
<!-- Checks that property files contain the same keys. -->
<!-- See http://checkstyle.sourceforge.net/config_misc.html#Translation -->
<module name="Translation"/>
<!-- Checks for Size Violations. -->
<!-- See http://checkstyle.sourceforge.net/config_sizes.html -->
<module name="FileLength"/>
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sourceforge.net/config_whitespace.html -->
<module name="FileTabCharacter"/>
<!-- Miscellaneous other checks. -->
<!-- See http://checkstyle.sourceforge.net/config_misc.html -->
<module name="RegexpSingleline">
<property name="format" value="\s+$"/>
<property name="minimum" value="0"/>
<property name="maximum" value="2"/>
<property name="message" value="Line has trailing spaces."/>
</module>
<!-- Checks for Headers -->
<!-- See http://checkstyle.sourceforge.net/config_header.html -->
<!-- <module name="Header"> -->
<!-- <property name="headerFile" value="${checkstyle.header.file}"/> -->
<!-- <property name="fileExtensions" value="java"/> -->
<!-- </module> -->
<module name="TreeWalker">
<module name="NoLineWrap"/>
<!-- Checks for Javadoc comments. -->
<!-- See http://checkstyle.sourceforge.net/config_javadoc.html -->
<module name="JavadocMethod">
<property name="scope" value="public"/>
<property name="allowMissingParamTags" value="true"/>
<property name="allowMissingThrowsTags" value="true"/>
<property name="allowMissingReturnTag" value="true"/>
<property name="minLineCount" value="2"/>
<property name="allowedAnnotations" value="Override, Test"/>
<property name="allowThrowsTagsForSubclasses" value="true"/>
<property name="severity" value="warning"/>
</module>
<module name="JavadocStyle"/>
<module name="JavadocTagContinuationIndentation"/>
<module name="SummaryJavadoc">
<property name="forbiddenSummaryFragments"
value="^@return the *|^This method returns |^A [{]@code [a-zA-Z0-9]+[}]( is a )"/>
</module>
<module name="JavadocParagraph"/>
<module name="AtclauseOrder">
<property name="tagOrder" value="@param, @return, @throws, @deprecated"/>
<property name="target"
value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF, VARIABLE_DEF"/>
</module>
<module name="SingleLineJavadoc">
<property name="ignoreInlineTags" value="false"/>
</module>
<!-- Checks for Naming Conventions. -->
<!-- See http://checkstyle.sourceforge.net/config_naming.html -->
<module name="LocalVariableName">
<property name="tokens" value="VARIABLE_DEF"/>
<property name="format" value="^[a-z]([a-z0-9][a-zA-Z0-9]*)?$"/>
<message key="name.invalidPattern"
value="Local variable name ''{0}'' must match pattern ''{1}''."/>
</module>
<module name="MemberName"/>
<module name="MethodName"/>
<module name="PackageName"/>
<module name="ParameterName"/>
<module name="TypeName"/>
<!-- Checks for imports -->
<!-- See http://checkstyle.sourceforge.net/config_import.html -->
<module name="AvoidStarImport"/>
<module name="IllegalImport"/> <!-- defaults to sun.* packages -->
<module name="RedundantImport"/>
<module name="UnusedImports">
<property name="processJavadoc" value="false"/>
</module>
<!-- Checks for Size Violations. -->
<!-- See http://checkstyle.sourceforge.net/config_sizes.html -->
<module name="LineLength">
<property name="max" value="120"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
<module name="MethodLength"/>
<!-- Checks for whitespace -->
<!-- See http://checkstyle.sourceforge.net/config_whitespace.html -->
<module name="EmptyForIteratorPad"/>
<module name="GenericWhitespace"/>
<module name="Indentation">
<property name="basicOffset" value="4"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="4"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="8"/>
<property name="arrayInitIndent" value="4"/>
</module>
<module name="MethodParamPad"/>
<module name="NoWhitespaceAfter"/>
<module name="NoWhitespaceBefore"/>
<module name="OperatorWrap">
<property name="option" value="NL"/>
<property name="tokens"
value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR,
LT, MINUS, MOD, NOT_EQUAL, PLUS, QUESTION, SL, SR, STAR, METHOD_REF "/>
</module>
<module name="ParenPad"/>
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
<!-- Modifier Checks -->
<!-- See http://checkstyle.sourceforge.net/config_modifiers.html -->
<module name="ModifierOrder"/>
<!-- Checks for blocks. You know, those {}'s -->
<!-- See http://checkstyle.sourceforge.net/config_blocks.html -->
<module name="AvoidNestedBlocks"/>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
<property name="tokens"
value="LITERAL_TRY, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE, LITERAL_SWITCH"/>
</module>
<module name="LeftCurly"/>
<module name="NeedBraces"/>
<module name="RightCurly">
<property name="id" value="RightCurlySame"/>
<property name="tokens"
value="LITERAL_TRY, LITERAL_CATCH, LITERAL_FINALLY, LITERAL_IF, LITERAL_ELSE,
LITERAL_DO"/>
</module>
<module name="RightCurly">
<property name="id" value="RightCurlyAlone"/>
<property name="option" value="alone"/>
<property name="tokens"
value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, STATIC_INIT,
INSTANCE_INIT"/>
</module>
<!-- Checks for common coding problems -->
<!-- See http://checkstyle.sourceforge.net/config_coding.html -->
<module name="EmptyStatement"/>
<module name="EqualsHashCode"/>
<module name="HiddenField">
<property name="ignoreSetter" value="true" />
<property name="ignoreConstructorParameter" value="true" />
</module>
<module name="IllegalInstantiation"/>
<module name="InnerAssignment"/>
<module name="MissingSwitchDefault"/>
<module name="SimplifyBooleanExpression"/>
<module name="SimplifyBooleanReturn"/>
<!-- Checks for class design -->
<!-- See http://checkstyle.sourceforge.net/config_design.html -->
<module name="FinalClass"/>
<module name="InterfaceIsType"/>
<!-- Miscellaneous other checks. -->
<!-- See http://checkstyle.sourceforge.net/config_misc.html -->
<module name="ArrayTypeStyle"/>
<module name="FinalParameters">
<property name="severity" value="warning"/>
</module>
<module name="TodoComment"/>
<module name="UpperEll"/>
<module name="CommentsIndentation"/>
</module>
<module name="SuppressionFilter">
<property name="file" value="suppressions.xml"/>
</module>
</module>

2
conf/java.header Normal file
Просмотреть файл

@ -0,0 +1,2 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

277
pom.xml Normal file
Просмотреть файл

@ -0,0 +1,277 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-spark-java-sdk</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>Azure Spark SDK for Java</name>
<description>Azure Spark SDK for Java provides supports for Azure Spark services, such as: HDInsight, Spark on Cosmos, Spark on SQL Big Data and so on. The SDK has the following Spark related features, including, but not limited to: 1. APIs to manage Spark applications 2. APIs to manage Spark clusters 3. APIs to monitor and debug Spark application running</description>
<url>https://github.com/microsoft/azure-spark-java-sdk</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<codehaus.plexus-utils.version>3.0.20</codehaus.plexus-utils.version>
<reflections.version>0.9.11</reflections.version>
<junit.version>4.12</junit.version>
<mockito.version>2.22.0</mockito.version>
<kotlin.version>1.3.20</kotlin.version>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
<semver.version>0.9.0</semver.version>
<jackson.version>2.9.7</jackson.version>
<!--suppress UnresolvedMavenProperty -->
<annotatedJdk>${org.checkerframework:jdk8:jar}</annotatedJdk>
</properties>
<licenses>
<license>
<name>MIT License</name>
<url>https://opensource.org/licenses/MIT</url>
<distribution>repo</distribution>
</license>
</licenses>
<profiles>
<profile>
<id>doclint-java8-disable</id>
<activation>
<jdk>[1.8,)</jdk>
</activation>
<properties>
<javadoc.opts>-Xdoclint:none</javadoc.opts>
</properties>
</profile>
<profile>
<id>dev</id>
<activation>
<property>
<name>!release</name>
</property>
</activation>
<properties>
</properties>
</profile>
<profile>
<id>release</id>
<activation>
<property>
<name>release</name>
</property>
</activation>
<properties>
</properties>
</profile>
<profile>
<id>it</id>
<build>
</build>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
<version>${codehaus.plexus-utils.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
<dependency>
<groupId>com.github.zafarkhaja</groupId>
<artifactId>java-semver</artifactId>
<version>${semver.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.hdinsight.v2018_06_01_preview</groupId>
<artifactId>azure-mgmt-hdinsight</artifactId>
<version>1.0.0-beta-2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>net.sourceforge.htmlunit</groupId>
<artifactId>htmlunit</artifactId>
<version>2.26</version>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>2.5.8</version>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>jdk8</artifactId>
<version>2.5.8</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.checkerframework</groupId>-->
<!--<artifactId>checker</artifactId>-->
<!--<version>2.5.8</version>-->
<!--</dependency>-->
<!-- TEST -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.0.0</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.19</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<!-- Kotlin is only enabled for testing-->
<!-- <execution>-->
<!-- <id>compile</id>-->
<!-- <goals> <goal>compile</goal> </goals>-->
<!-- <configuration>-->
<!-- <sourceDirs>-->
<!-- <sourceDir>${project.basedir}/src/main/java</sourceDir>-->
<!-- </sourceDirs>-->
<!-- </configuration>-->
<!-- </execution>-->
<execution>
<id>test-compile</id>
<goals> <goal>test-compile</goal> </goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/test/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>properties</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>${java.source.version}</source>
<target>${java.target.version}</target>
<compilerArguments>
<Xmaxerrs>10000</Xmaxerrs>
<Xmaxwarns>10000</Xmaxwarns>
</compilerArguments>
<annotationProcessorPaths>
<path>
<groupId>org.checkerframework</groupId>
<artifactId>checker</artifactId>
<version>2.5.8</version>
</path>
</annotationProcessorPaths>
<annotationProcessors>
<!-- Add all the checkers you want to enable here -->
<annotationProcessor>org.checkerframework.checker.nullness.NullnessChecker</annotationProcessor>
<annotationProcessor>org.checkerframework.checker.formatter.FormatterChecker </annotationProcessor>
</annotationProcessors>
<compilerArgs>
<!-- location of the annotated JDK, which comes from a Maven dependency -->
<!-- <arg>-Xbootclasspath/p:${annotatedJdk}</arg>-->
<arg>-AskipUses=org\.apache\.commons\.lang3\.</arg>
<arg>-AsuppressWarnings=incompatible</arg>
<!-- -Awarns turns type-checking warnings into errors. -->
<!-- <arg>-Awarns</arg>-->
</compilerArgs>
</configuration>
<executions>
<!-- Replacing default-compile as it is treated specially by maven -->
<execution>
<id>default-compile</id>
<phase>none</phase>
</execution>
<!-- Replacing default-testCompile as it is treated specially by maven -->
<execution>
<id>default-testCompile</id>
<phase>none</phase>
</execution>
<execution>
<id>java-compile</id>
<phase>compile</phase>
<goals> <goal>compile</goal> </goals>
</execution>
<execution>
<id>java-test-compile</id>
<phase>test-compile</phase>
<goals> <goal>testCompile</goal> </goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Просмотреть файл

@ -0,0 +1,104 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.clusters;
import com.microsoft.azure.spark.tools.errors.HDIException;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.IOException;
/**
* The interface is for Spark cluster properties.
*/
public interface ClusterDetail {
default boolean isEmulator() {
return false;
}
default boolean isConfigInfoAvailable() {
return false;
}
String getName();
String getTitle();
@Nullable
default String getState() {
return null;
}
@Nullable
default String getLocation() {
return null;
}
String getConnectionUrl();
@Nullable
default String getCreateDate() {
return null;
}
// default ClusterType getType() {
// return null;
// }
@Nullable
default String getVersion() {
return null;
}
// SubscriptionDetail getSubscription();
default int getDataNodes() {
return 0;
}
@Nullable
default String getHttpUserName() throws HDIException {
return null;
}
@Nullable
default String getHttpPassword() throws HDIException {
return null;
}
@Nullable
default String getOSType() {
return null;
}
@Nullable
default String getResourceGroup() {
return null;
}
// @Nullable
// default IHDIStorageAccount getStorageAccount() throws HDIException {
// return null;
// }
// default List<HDStorageAccount> getAdditionalStorageAccounts() {
// return null;
// }
default void getConfigurationInfo() throws IOException, HDIException {
}
@Nullable
default String getSparkVersion() {
return null;
}
// default SparkSubmitStorageType getDefaultStorageType(){
// return SparkSubmitStorageType.DEFAULT_STORAGE_ACCOUNT;
// }
//
// default SparkSubmitStorageTypeOptionsForCluster getStorageOptionsType(){
// return SparkSubmitStorageTypeOptionsForCluster.ClusterWithFullType;
// }
}

Просмотреть файл

@ -0,0 +1,168 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.clusters;
import com.microsoft.azure.arm.resources.Region;
import com.microsoft.azure.management.hdinsight.v2018_06_01_preview.Cluster;
import com.microsoft.azure.management.hdinsight.v2018_06_01_preview.ClusterGetProperties;
import com.microsoft.azure.management.hdinsight.v2018_06_01_preview.ClusterIdentity;
import com.microsoft.azure.management.hdinsight.v2018_06_01_preview.ConnectivityEndpoint;
import com.microsoft.azure.management.hdinsight.v2018_06_01_preview.implementation.ClusterInner;
import com.microsoft.azure.management.hdinsight.v2018_06_01_preview.implementation.HDInsightManager;
import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import rx.Observable;
import java.util.Map;
/**
* The HDInsight Cluster class.
*/
public final class HdiClusterDetail implements ClusterDetail, LivyCluster, YarnCluster, Cluster {
private final boolean isGatewayRestAuthCredentialEnabled;
private final Map<String, String> coreSiteConfig;
private final Map<String, String> gatewayConf;
private final Cluster cluster;
public String getYarnUIBaseUrl() {
return this.getConnectionUrl() + "/yarnui/";
}
public String getLivyConnectionUrl() {
return this.getConnectionUrl() + "/livy/";
}
public String getYarnNMConnectionUrl() {
return this.getYarnUIBaseUrl() + "ws/v1/clusters/apps/";
}
public String getName() {
return cluster.name();
}
public String getTitle() {
return this.getName() + " @" + this.regionName();
}
public String getConnectionUrl() {
ConnectivityEndpoint httpConnEP = inner().properties().connectivityEndpoints()
.stream()
.filter(it -> StringUtils.equalsIgnoreCase(it.name(), "HTTPS"))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(
"Can't find HTTPS connection entity from HDInsight cluster properties"));
// such as https://hdicluster.azurehdinsight.net
return httpConnEP.name().toLowerCase() + "://" + httpConnEP.location();
}
public boolean isGatewayRestAuthCredentialEnabled() {
return this.isGatewayRestAuthCredentialEnabled;
}
@Nullable
public String getHttpUserName() {
return this.isGatewayRestAuthCredentialEnabled
? this.gatewayConf.get(GatewayRestAuthCredentialConfigKey.USERNAME.getKey())
: null;
}
@Nullable
public String getHttpPassword() {
return this.isGatewayRestAuthCredentialEnabled
? this.gatewayConf.get(GatewayRestAuthCredentialConfigKey.PASSWORD.getKey())
: null;
}
public HdiClusterDetail(Cluster cluster, Map<String, String> coreSiteConfig, Map<String, String> gatewayConf) {
super();
this.cluster = cluster;
this.coreSiteConfig = coreSiteConfig;
this.gatewayConf = gatewayConf;
String isGWAuthCredEnabled = this.gatewayConf.get(GatewayRestAuthCredentialConfigKey.ENABLED.getKey());
this.isGatewayRestAuthCredentialEnabled = isGWAuthCredEnabled != null
&& StringUtils.equalsIgnoreCase(isGWAuthCredEnabled, "true");
}
public String etag() {
return this.cluster.etag();
}
public String id() {
return this.cluster.id();
}
public ClusterIdentity identity() {
return this.cluster.identity();
}
public ClusterInner inner() {
return this.cluster.inner();
}
public String key() {
return this.cluster.key();
}
public HDInsightManager manager() {
return this.cluster.manager();
}
public String name() {
return this.cluster.name();
}
public ClusterGetProperties properties() {
return this.cluster.properties();
}
public Cluster refresh() {
return this.cluster.refresh();
}
public Observable<Cluster> refreshAsync() {
return this.cluster.refreshAsync();
}
public Region region() {
return this.cluster.region();
}
public String regionName() {
return this.cluster.regionName();
}
public String resourceGroupName() {
return this.cluster.resourceGroupName();
}
public Map<String, String> tags() {
return this.cluster.tags();
}
public String type() {
return this.cluster.type();
}
public Update update() {
return this.cluster.update();
}
private enum GatewayRestAuthCredentialConfigKey {
ENABLED("restAuthCredential.isEnabled"),
USERNAME("restAuthCredential.username"),
PASSWORD("restAuthCredential.password");
private final String key;
public final String getKey() {
return this.key;
}
GatewayRestAuthCredentialConfigKey(String key) {
this.key = key;
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.clusters;
import java.net.URI;
public interface LivyCluster {
String getLivyConnectionUrl();
default String getLivyBatchUrl() {
return URI.create(getLivyConnectionUrl()).resolve("batches").toString();
}
default String getLivySessionUrl() {
return URI.create(getLivyConnectionUrl()).resolve("sessions").toString();
}
}

Просмотреть файл

@ -0,0 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.clusters;
public interface YarnCluster {
String getYarnNMConnectionUrl();
String getYarnUIBaseUrl();
}

Просмотреть файл

@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.errors;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* The base exception class for HDInsight cluster.
*/
public class HDIException extends Exception {
private String mErrorLog;
private int errorCode;
public HDIException(String message) {
super(message);
mErrorLog = "";
}
public HDIException(String message, int errorCode) {
super(message);
this.errorCode = errorCode;
mErrorLog = "";
}
public HDIException(String message, String errorLog) {
super(message);
mErrorLog = errorLog;
}
public HDIException(String message, Throwable throwable) {
super(message, throwable);
if (throwable instanceof HDIException) {
mErrorLog = ((HDIException) throwable).getErrorLog();
} else {
StringWriter sw = new StringWriter();
PrintWriter writer = new PrintWriter(sw);
throwable.printStackTrace(writer);
writer.flush();
mErrorLog = sw.toString();
}
}
public String getErrorLog() {
return mErrorLog;
}
public int getErrorCode() {
return errorCode;
}
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.errors;
/**
* The Base Exception for all Spark Job related exceptions.
*/
public class SparkJobException extends HDIException {
public SparkJobException(String message) {
super(message);
}
public SparkJobException(String message, int errorCode) {
super(message, errorCode);
}
public SparkJobException(String message, String errorLog) {
super(message, errorLog);
}
public SparkJobException(String message, Throwable throwable) {
super(message, throwable);
}
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.errors;
/**
* The Exception for probing the running Spark Batch job status, but the Spark Batch Job is finished.
*/
public class SparkJobFinishedException extends HDIException {
public SparkJobFinishedException(String message) {
super(message);
}
public SparkJobFinishedException(String message, int errorCode) {
super(message, errorCode);
}
public SparkJobFinishedException(String message, String errorLog) {
super(message, errorLog);
}
public SparkJobFinishedException(String message, Throwable throwable) {
super(message, throwable);
}
}

Просмотреть файл

@ -0,0 +1,12 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.errors;
import java.net.UnknownServiceException;
public class SparkJobNotConfiguredException extends UnknownServiceException {
public SparkJobNotConfiguredException(String message) {
super(message);
}
}

Просмотреть файл

@ -0,0 +1,10 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.errors;
public class SparkJobUploadArtifactException extends SparkJobException {
public SparkJobUploadArtifactException(String message, Throwable cause) {
super(message, cause);
}
}

Просмотреть файл

@ -0,0 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.events;
public enum MessageInfoType {
Error,
Info,
Log,
Warning,
Hyperlink,
HyperlinkWithText
}

Просмотреть файл

@ -0,0 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.events;
public interface SparkBatchJobSubmissionEvent {
}

Просмотреть файл

@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.events;
import com.microsoft.azure.spark.tools.job.SparkBatchJob;
public class SparkBatchJobSubmittedEvent implements SparkBatchJobSubmissionEvent {
private SparkBatchJob job;
public SparkBatchJobSubmittedEvent(SparkBatchJob job) {
this.job = job;
}
public SparkBatchJob getJob() {
return job;
}
}

Просмотреть файл

@ -0,0 +1,776 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.job;
import com.gargoylesoftware.htmlunit.BrowserVersion;
import com.gargoylesoftware.htmlunit.Cache;
import com.gargoylesoftware.htmlunit.ScriptException;
import com.gargoylesoftware.htmlunit.WebClient;
import com.gargoylesoftware.htmlunit.html.DomElement;
import com.gargoylesoftware.htmlunit.html.HtmlAnchor;
import com.gargoylesoftware.htmlunit.html.HtmlPage;
import com.gargoylesoftware.htmlunit.html.HtmlTableBody;
import com.microsoft.azure.spark.tools.clusters.LivyCluster;
import com.microsoft.azure.spark.tools.errors.SparkJobException;
import com.microsoft.azure.spark.tools.events.MessageInfoType;
import com.microsoft.azure.spark.tools.log.Logger;
import com.microsoft.azure.spark.tools.restapi.HttpResponse;
import com.microsoft.azure.spark.tools.restapi.ObjectConvertUtils;
import com.microsoft.azure.spark.tools.restapi.SparkBatchJobState;
import com.microsoft.azure.spark.tools.restapi.SparkBatchSubmission;
import com.microsoft.azure.spark.tools.restapi.SparkJobLog;
import com.microsoft.azure.spark.tools.restapi.SparkSubmissionParameter;
import com.microsoft.azure.spark.tools.restapi.SparkSubmitResponse;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttempt;
import org.checkerframework.checker.nullness.qual.Nullable;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownServiceException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.microsoft.azure.spark.tools.events.MessageInfoType.Info;
import static com.microsoft.azure.spark.tools.events.MessageInfoType.Log;
import static java.lang.Thread.sleep;
//@SuppressWarnings("argument.type.incompatible")
public class LivySparkBatch implements SparkBatchJob, Logger {
public static final String WebHDFSPathPattern = "^(https?://)([^/]+)(/.*)?(/webhdfs/v1)(/.*)?$";
public static final String AdlsPathPattern = "^adl://([^/.\\s]+\\.)+[^/.\\s]+(/[^/.\\s]+)*/?$";
@Nullable
private String currentLogUrl;
private Observer<SimpleImmutableEntry<MessageInfoType, String>> ctrlSubject;
@Nullable
private String getCurrentLogUrl() {
return currentLogUrl;
}
private void setCurrentLogUrl(@Nullable String currentLogUrl) {
this.currentLogUrl = currentLogUrl;
}
/**
* The base connection URI for HDInsight Spark Job service, such as: http://livy:8998/batches.
*/
@Nullable
private URI connectUri;
/**
* The base connection URI for HDInsight Yarn application service, such as: http://hn0-spark2:8088/cluster/app.
*/
@Nullable
private URI yarnConnectUri;
/**
* The LIVY Spark batch job ID got from job submission.
*/
private int batchId;
/**
* The Spark Batch Job submission parameter.
*/
protected SparkSubmissionParameter submissionParameter;
/**
* The Spark Batch Job submission for RestAPI transaction.
*/
private SparkBatchSubmission submission;
/**
* The setting of maximum retry count in RestAPI calling.
*/
private int retriesMax = 3;
/**
* The setting of delay seconds between tries in RestAPI calling.
*/
private int delaySeconds = 10;
/**
* The global cache for fetched Yarn UI page by browser.
*/
private Cache globalCache = new Cache();
private LivyCluster cluster;
/**
* Access token used for uploading files to ADLS storage account.
*/
@Nullable
private String accessToken;
@Nullable
private String destinationRootPath;
public LivySparkBatch(
LivyCluster cluster,
SparkSubmissionParameter submissionParameter,
SparkBatchSubmission sparkBatchSubmission,
Observer<SimpleImmutableEntry<MessageInfoType, String>> ctrlSubject) {
this(cluster, submissionParameter, sparkBatchSubmission, ctrlSubject, null, null);
}
public LivySparkBatch(
LivyCluster cluster,
SparkSubmissionParameter submissionParameter,
SparkBatchSubmission sparkBatchSubmission,
Observer<SimpleImmutableEntry<MessageInfoType, String>> ctrlSubject,
@Nullable String accessToken,
@Nullable String destinationRootPath) {
this.cluster = cluster;
this.submissionParameter = submissionParameter;
this.submission = sparkBatchSubmission;
this.ctrlSubject = ctrlSubject;
this.accessToken = accessToken;
this.destinationRootPath = destinationRootPath;
}
/**
* Getter of Spark Batch Job submission parameter.
*
* @return the instance of Spark Batch Job submission parameter
*/
public SparkSubmissionParameter getSubmissionParameter() {
return submissionParameter;
}
/**
* Getter of the Spark Batch Job submission for RestAPI transaction.
*
* @return the Spark Batch Job submission
*/
public SparkBatchSubmission getSubmission() {
return submission;
}
/**
* Getter of the base connection URI for HDInsight Spark Job service.
*
* @return the base connection URI for HDInsight Spark Job service
*/
@Override
public URI getConnectUri() {
return URI.create(getCluster().getLivyBatchUrl());
}
public LivyCluster getCluster() {
return cluster;
}
/**
* Getter of the LIVY Spark batch job ID got from job submission.
*
* @return the LIVY Spark batch job ID
*/
@Override
public int getBatchId() {
return batchId;
}
/**
* Setter of LIVY Spark batch job ID got from job submission.
*
* @param batchId the LIVY Spark batch job ID
*/
protected void setBatchId(int batchId) {
this.batchId = batchId;
}
/**
* Getter of the maximum retry count in RestAPI calling.
*
* @return the maximum retry count in RestAPI calling
*/
@Override
public int getRetriesMax() {
return retriesMax;
}
/**
* Setter of the maximum retry count in RestAPI calling.
*
* @param retriesMax the maximum retry count in RestAPI calling
*/
@Override
public void setRetriesMax(int retriesMax) {
this.retriesMax = retriesMax;
}
/**
* Getter of the delay seconds between tries in RestAPI calling.
*
* @return the delay seconds between tries in RestAPI calling
*/
@Override
public int getDelaySeconds() {
return delaySeconds;
}
/**
* Setter of the delay seconds between tries in RestAPI calling.
*
* @param delaySeconds the delay seconds between tries in RestAPI calling
*/
@Override
public void setDelaySeconds(int delaySeconds) {
this.delaySeconds = delaySeconds;
}
/**
* Create a batch Spark job.
*
* @return the current instance for chain calling
* @throws IOException the exceptions for networking connection issues related
*/
private LivySparkBatch createBatchJob()
throws IOException {
// Submit the batch job
HttpResponse httpResponse = this.getSubmission().createBatchSparkJob(
getConnectUri().toString(), this.getSubmissionParameter());
// Get the batch ID from response and save it
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
this.setBatchId(jobResp.getId());
return this;
}
throw new UnknownServiceException(String.format(
"Failed to submit Spark batch job. error code: %d, type: %s, reason: %s.",
httpResponse.getCode(), httpResponse.getContent(), httpResponse.getMessage()));
}
/**
* Kill the batch job specified by ID.
*
* @return the current instance for chain calling
*/
@Override
public Observable<? extends SparkBatchJob> killBatchJob() {
return Observable.fromCallable(() -> {
HttpResponse deleteResponse = this.getSubmission().killBatchJob(
this.getConnectUri().toString(), this.getBatchId());
if (deleteResponse.getCode() > 300) {
throw new UnknownServiceException(String.format(
"Failed to stop spark job. error code: %d, reason: %s.",
deleteResponse.getCode(), deleteResponse.getContent()));
}
return this;
});
}
/**
* Get Spark Job Yarn application state with retries.
*
* @return the Yarn application state got
* @throws IOException exceptions in transaction
*/
public String getState() throws IOException {
int retries = 0;
do {
try {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
this.getConnectUri().toString(), batchId);
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
return jobResp.getState();
}
} catch (IOException e) {
log().debug("Got exception " + e.toString() + ", waiting for a while to try", e);
}
try {
// Retry interval
sleep(TimeUnit.SECONDS.toMillis(this.getDelaySeconds()));
} catch (InterruptedException ex) {
throw new IOException("Interrupted in retry attempting", ex);
}
} while (++retries < this.getRetriesMax());
throw new UnknownServiceException(
"Failed to get job state: Unknown service error after " + --retries + " retries");
}
/**
* Get Spark Job Yarn application ID with retries.
*
* @param batchBaseUri the connection URI
* @param livyBatchId the Livy batch job ID
* @return the Yarn application ID got
* @throws IOException exceptions in transaction
*/
String getSparkJobApplicationId(URI batchBaseUri, int livyBatchId) throws IOException {
int retries = 0;
do {
try {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
batchBaseUri.toString(), livyBatchId);
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
if (jobResp.getAppId() != null) {
return jobResp.getAppId();
}
}
} catch (IOException e) {
log().debug("Got exception " + e.toString() + ", waiting for a while to try", e);
}
try {
// Retry interval
sleep(TimeUnit.SECONDS.toMillis(this.getDelaySeconds()));
} catch (InterruptedException ex) {
throw new IOException("Interrupted in retry attempting", ex);
}
} while (++retries < this.getRetriesMax());
throw new UnknownServiceException(
"Failed to get job Application ID: Unknown service error after " + --retries + " retries");
}
/**
* New RxAPI: Get current job application Id.
*
* @return Application Id Observable
*/
Observable<String> getSparkJobApplicationIdObservable() {
return Observable.fromCallable(() -> {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
getConnectUri().toString(), getBatchId());
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
return jobResp.getAppId();
}
throw new UnknownServiceException("Can't get Spark Application Id");
});
}
/**
* New RxAPI: Get the current Spark job Yarn application attempt containers.
*
* @return The string pair Observable of Host and Container Id
*/
Observable<SimpleImmutableEntry<URI, String>> getSparkJobYarnContainersObservable(AppAttempt appAttempt) {
return loadPageByBrowserObservable(getConnectUri().resolve("/yarnui/hn/cluster/appattempt/")
.resolve(appAttempt.getAppAttemptId()).toString())
.retry(getRetriesMax())
.repeatWhen(ob -> ob.delay(getDelaySeconds(), TimeUnit.SECONDS))
.filter(this::isSparkJobYarnAppAttemptNotJustLaunched)
// Get the container table by XPath
.map(htmlPage -> htmlPage.getFirstByXPath("//*[@id=\"containers\"]/tbody"))
.filter(Objects::nonNull) // May get null in the last step
.map(HtmlTableBody.class::cast)
.map(HtmlTableBody::getRows) // To container rows
.buffer(2, 1)
// Wait for last two refreshes getting the same rows count, which means the yarn application
// launching containers finished
.takeUntil(buf -> buf.size() == 2 && buf.get(0).size() == buf.get(1).size())
.filter(buf -> buf.size() == 2 && buf.get(0).size() == buf.get(1).size())
.map(buf -> buf.get(1))
.flatMap(Observable::from) // From rows to row one by one
.filter(containerRow -> {
try {
// Read container URL from YarnUI page
String urlFromPage = ((HtmlAnchor) containerRow.getCell(3).getFirstChild()).getHrefAttribute();
URI containerUri = getConnectUri().resolve(urlFromPage);
return loadPageByBrowserObservable(containerUri.toString())
.map(this::isSparkJobYarnContainerLogAvailable)
.toBlocking()
.singleOrDefault(false);
} catch (Exception ignore) {
return false;
}
})
.map(row -> {
URI hostUrl = URI.create(row.getCell(1).getTextContent().trim());
String containerId = row.getCell(0).getTextContent().trim();
return new SimpleImmutableEntry<>(hostUrl, containerId);
});
}
/*
* Parsing the Application Attempt HTML page to determine if the attempt is running
*/
private Boolean isSparkJobYarnAppAttemptNotJustLaunched(HtmlPage htmlPage) {
// Get the info table by XPath
@Nullable
HtmlTableBody infoBody = htmlPage.getFirstByXPath("//*[@class=\"info\"]/tbody");
if (infoBody == null) {
return false;
}
return infoBody
.getRows()
.stream()
.filter(row -> row.getCells().size() >= 2)
.filter(row -> row.getCell(0)
.getTextContent()
.trim()
.toLowerCase()
.equals("application attempt state:"))
.map(row -> !row.getCell(1)
.getTextContent()
.trim()
.toLowerCase()
.equals("launched"))
.findFirst()
.orElse(false);
}
private Boolean isSparkJobYarnContainerLogAvailable(HtmlPage htmlPage) {
Optional<DomElement> firstContent = Optional.ofNullable(
htmlPage.getFirstByXPath("//*[@id=\"layout\"]/tbody/tr/td[2]"));
return firstContent.map(DomElement::getTextContent)
.map(line -> !line.trim()
.toLowerCase()
.contains("no logs available"))
.orElse(false);
}
private Observable<HtmlPage> loadPageByBrowserObservable(String url) {
final WebClient HTTP_WEB_CLIENT = new WebClient(BrowserVersion.CHROME);
HTTP_WEB_CLIENT.setCache(globalCache);
if (getSubmission().getCredentialsProvider() != null) {
HTTP_WEB_CLIENT.setCredentialsProvider(getSubmission().getCredentialsProvider());
}
return Observable.create(ob -> {
try {
ob.onNext(HTTP_WEB_CLIENT.getPage(url));
ob.onCompleted();
} catch (ScriptException e) {
log().debug("get Spark job Yarn attempts detail browser rendering Error", e);
} catch (IOException e) {
ob.onError(e);
}
});
}
/**
* Get Spark Job driver log URL with retries.
*
* @deprecated
* The Livy Rest API driver log Url field only get the running job.
* Use getSparkJobDriverLogUrlObservable() please, with RxJava supported
*
* @param batchBaseUri the connection URI
* @param livyBatchId the Livy batch job ID
* @return the Spark Job driver log URL
* @throws IOException exceptions in transaction
*/
@Nullable
@Deprecated
public String getSparkJobDriverLogUrl(URI batchBaseUri, int livyBatchId) throws IOException {
int retries = 0;
do {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
batchBaseUri.toString(), livyBatchId);
try {
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
String driverLogUrl = (String) jobResp.getAppInfo().get("driverLogUrl");
if (jobResp.getAppId() != null && driverLogUrl != null) {
return driverLogUrl;
}
}
} catch (IOException e) {
log().debug("Got exception " + e.toString() + ", waiting for a while to try", e);
}
try {
// Retry interval
sleep(TimeUnit.SECONDS.toMillis(this.getDelaySeconds()));
} catch (InterruptedException ex) {
throw new IOException("Interrupted in retry attempting", ex);
}
} while (++retries < this.getRetriesMax());
throw new UnknownServiceException(
"Failed to get job driver log URL: Unknown service error after " + --retries + " retries");
}
@Override
public Observable<SimpleImmutableEntry<MessageInfoType, String>> getSubmissionLog() {
// Those lines are carried per response,
// if there is no value followed, the line should not be sent to console
final Set<String> ignoredEmptyLines = new HashSet<>(Arrays.asList(
"stdout:",
"stderr:",
"yarn diagnostics:"));
return Observable.create(ob -> {
try {
int start = 0;
final int maxLinesPerGet = 128;
int linesGot;
boolean isSubmitting = true;
while (isSubmitting) {
boolean isAppIdAllocated = !this.getSparkJobApplicationIdObservable().isEmpty()
.toBlocking()
.lastOrDefault(true);
String logUrl = String.format("%s/%d/log?from=%d&size=%d",
this.getConnectUri().toString(), batchId, start, maxLinesPerGet);
HttpResponse httpResponse = this.getSubmission().getHttpResponseViaGet(logUrl);
SparkJobLog sparkJobLog = ObjectConvertUtils.convertJsonToObject(httpResponse.getMessage(),
SparkJobLog.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark log response: " + httpResponse.getMessage()));
// To subscriber
sparkJobLog.getLog().stream()
.filter(line -> !ignoredEmptyLines.contains(line.trim().toLowerCase()))
.forEach(line -> ob.onNext(new SimpleImmutableEntry<>(Log, line)));
linesGot = sparkJobLog.getLog().size();
start += linesGot;
// Retry interval
if (linesGot == 0) {
isSubmitting = this.getState().equals("starting") && !isAppIdAllocated;
sleep(TimeUnit.SECONDS.toMillis(this.getDelaySeconds()));
}
}
} catch (IOException ex) {
ob.onNext(new SimpleImmutableEntry<>(MessageInfoType.Error, ex.getMessage()));
} catch (InterruptedException ignored) {
} finally {
ob.onCompleted();
}
});
}
public boolean isActive() throws IOException {
int retries = 0;
do {
try {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
this.getConnectUri().toString(), batchId);
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
return jobResp.isAlive();
}
} catch (IOException e) {
log().debug("Got exception " + e.toString() + ", waiting for a while to try", e);
}
try {
// Retry interval
sleep(TimeUnit.SECONDS.toMillis(this.getDelaySeconds()));
} catch (InterruptedException ex) {
throw new IOException("Interrupted in retry attempting", ex);
}
} while (++retries < this.getRetriesMax());
throw new UnknownServiceException(
"Failed to detect job activity: Unknown service error after " + --retries + " retries");
}
protected Observable<SimpleImmutableEntry<String, String>> getJobDoneObservable() {
return Observable.create((Subscriber<? super SimpleImmutableEntry<String, String>> ob) -> {
try {
boolean isJobActive;
SparkBatchJobState state = SparkBatchJobState.NOT_STARTED;
String diagnostics = "";
do {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
this.getConnectUri().toString(), batchId);
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
SparkSubmitResponse jobResp = ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
state = SparkBatchJobState.valueOf(jobResp.getState().toUpperCase());
diagnostics = String.join("\n", jobResp.getLog());
isJobActive = !isDone(state.toString());
} else {
isJobActive = false;
}
// Retry interval
sleep(1000);
} while (isJobActive);
ob.onNext(new SimpleImmutableEntry<>(state.toString(), diagnostics));
ob.onCompleted();
} catch (IOException ex) {
ob.onError(ex);
} catch (InterruptedException ignored) {
ob.onCompleted();
}
});
}
@Override
public Observer<SimpleImmutableEntry<MessageInfoType, String>> getCtrlSubject() {
return ctrlSubject;
}
/**
* The method is to deploy artifact to cluster (Not supported for Livy Spark Batch job).
*
* @param artifactPath the artifact to deploy
* @return the observable error since not support deploy yet
*/
@Override
public Observable<? extends SparkBatchJob> deploy(String artifactPath) {
return Observable.error(new UnsupportedOperationException());
}
/**
* New RxAPI: Submit the job.
*
* @return Spark Job observable
*/
@Override
public Observable<? extends SparkBatchJob> submit() {
return Observable.fromCallable(() -> createBatchJob());
}
@Override
public boolean isDone(String state) {
switch (SparkBatchJobState.valueOf(state.toUpperCase())) {
case SHUTTING_DOWN:
case ERROR:
case DEAD:
case SUCCESS:
return true;
case NOT_STARTED:
case STARTING:
case RUNNING:
case RECOVERING:
case BUSY:
case IDLE:
default:
return false;
}
}
@Override
public boolean isRunning(String state) {
return SparkBatchJobState.valueOf(state.toUpperCase()) == SparkBatchJobState.RUNNING;
}
@Override
public boolean isSuccess(String state) {
return SparkBatchJobState.valueOf(state.toUpperCase()) == SparkBatchJobState.SUCCESS;
}
/**
* New RxAPI: Get the job status (from livy).
*
* @return Spark Job observable
*/
public Observable<? extends SparkSubmitResponse> getStatus() {
return Observable.fromCallable(() -> {
HttpResponse httpResponse = this.getSubmission().getBatchSparkJobStatus(
this.getConnectUri().toString(), getBatchId());
if (httpResponse.getCode() >= 200 && httpResponse.getCode() < 300) {
return ObjectConvertUtils.convertJsonToObject(
httpResponse.getMessage(), SparkSubmitResponse.class)
.orElseThrow(() -> new UnknownServiceException(
"Bad spark job response: " + httpResponse.getMessage()));
}
throw new SparkJobException("Can't get cluster " + getSubmissionParameter().getClusterName() + " status.");
});
}
@Override
public Observable<String> awaitStarted() {
return getStatus()
.map(status -> new SimpleImmutableEntry<>(status.getState(), String.join("\n", status.getLog())))
.retry(getRetriesMax())
.repeatWhen(ob -> ob
.doOnNext(ignored -> {
getCtrlSubject().onNext(new SimpleImmutableEntry<>(Info, "The Spark job is starting..."));
})
.delay(getDelaySeconds(), TimeUnit.SECONDS)
)
.takeUntil(stateLogPair -> isDone(stateLogPair.getKey()) || isRunning(stateLogPair.getKey()))
.filter(stateLogPair -> isDone(stateLogPair.getKey()) || isRunning(stateLogPair.getKey()))
.flatMap(stateLogPair -> {
if (isDone(stateLogPair.getKey()) && !isSuccess(stateLogPair.getKey())) {
return Observable.error(new SparkJobException(
"The Spark job failed to start due to " + stateLogPair.getValue()));
}
return Observable.just(stateLogPair.getKey());
});
}
@Override
public Observable<SimpleImmutableEntry<String, String>> awaitDone() {
return getJobDoneObservable();
}
@Override
public Observable<String> awaitPostDone() {
// return getJobLogAggregationDoneObservable();
return Observable.empty();
}
}

Просмотреть файл

@ -0,0 +1,153 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.job;
import com.microsoft.azure.spark.tools.events.MessageInfoType;
import rx.Observable;
import rx.Observer;
import java.net.URI;
import java.util.AbstractMap.SimpleImmutableEntry;
public interface SparkBatchJob {
/**
* Getter of the base connection URI for HDInsight Spark Job service.
*
* @return the base connection URI for HDInsight Spark Job service
*/
URI getConnectUri();
/**
* Getter of the LIVY Spark batch job ID got from job submission.
*
* @return the LIVY Spark batch job ID
*/
int getBatchId();
/**
* Getter of the maximum retry count in RestAPI calling.
*
* @return the maximum retry count in RestAPI calling
*/
int getRetriesMax();
/**
* Setter of the maximum retry count in RestAPI calling.
*
* @param retriesMax the maximum retry count in RestAPI calling
*/
void setRetriesMax(int retriesMax);
/**
* Getter of the delay seconds between tries in RestAPI calling.
*
* @return the delay seconds between tries in RestAPI calling
*/
int getDelaySeconds();
/**
* Setter of the delay seconds between tries in RestAPI calling.
*
* @param delaySeconds the delay seconds between tries in RestAPI calling
*/
void setDelaySeconds(int delaySeconds);
/**
* Kill the batch job specified by ID.
*
* @return the current instance observable for chain calling,
* Observable Error: IOException exceptions for networking connection issues related
*/
Observable<? extends SparkBatchJob> killBatchJob();
/**
* Get Spark batch job driver host by ID.
*
* @return Spark driver node host observable
* Observable Error: IOException exceptions for the driver host not found
*/
// Observable<String> getSparkDriverHost();
/**
* Get Spark job driver log observable.
*
* @param type the log type, such as `stderr`, `stdout`
* @param logOffset the log offset that fetching would start from
* @param size the fetching size, -1 for all.
* @return the log and its starting offset pair observable
*/
// Observable<SimpleImmutableEntry<String, Long>> getDriverLog(String type, long logOffset, int size);
/**
* Get Spark job submission log observable.
*
* @return the log type and content pair observable
*/
Observable<SimpleImmutableEntry<MessageInfoType, String>> getSubmissionLog();
/**
* Await the job started observable.
*
* @return the job state string
*/
Observable<String> awaitStarted();
/**
* Await the job done observable.
*
* @return the job state string and its diagnostics message
*/
Observable<SimpleImmutableEntry<String, String>> awaitDone();
/**
* Await the job post actions done, such as the log aggregation.
* @return the job post action status string
*/
Observable<String> awaitPostDone();
/**
* Get the job control messages observable.
*
* @return the job control message type and content pair observable
*/
Observer<SimpleImmutableEntry<MessageInfoType, String>> getCtrlSubject();
/**
* Deploy the job artifact into cluster.
*
* @param artifactPath the artifact to deploy
* @return ISparkBatchJob observable
* Observable Error: IOException;
*/
Observable<? extends SparkBatchJob> deploy(String artifactPath);
/**
* Create a batch Spark job and submit the job into cluster.
*
* @return ISparkBatchJob observable
* Observable Error: IOException;
*/
Observable<? extends SparkBatchJob> submit();
/**
* Is the job done, success or failure.
*
* @return true for success or failure
*/
boolean isDone(String state);
/**
* Is the job running.
*
* @return true for running
*/
boolean isRunning(String state);
/**
* Is the job finished with success.
*
* @return true for success
*/
boolean isSuccess(String state);
}

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.job;
import rx.Observable;
import java.net.URI;
import java.util.AbstractMap;
interface SparkDriverLog {
URI getYarnNMConnectUri();
Observable<String> getDriverHost();
Observable<AbstractMap.SimpleImmutableEntry<String, Long>> getDriverLog(String type, long logOffset, int size);
}

Просмотреть файл

@ -0,0 +1,382 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.job;
import com.gargoylesoftware.htmlunit.BrowserVersion;
import com.gargoylesoftware.htmlunit.Cache;
import com.gargoylesoftware.htmlunit.FailingHttpStatusCodeException;
import com.gargoylesoftware.htmlunit.WebClient;
import com.gargoylesoftware.htmlunit.html.DomElement;
import com.gargoylesoftware.htmlunit.html.DomNode;
import com.gargoylesoftware.htmlunit.html.HtmlPage;
import com.gargoylesoftware.htmlunit.html.HtmlParagraph;
import com.gargoylesoftware.htmlunit.html.HtmlPreformattedText;
import com.microsoft.azure.spark.tools.clusters.YarnCluster;
import com.microsoft.azure.spark.tools.log.Logger;
import com.microsoft.azure.spark.tools.restapi.HttpResponse;
import com.microsoft.azure.spark.tools.restapi.ObjectConvertUtils;
import com.microsoft.azure.spark.tools.restapi.SparkBatchSubmission;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.App;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttempt;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppAttemptsResponse;
import com.microsoft.azure.spark.tools.restapi.yarn.rm.AppResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.CredentialsProvider;
import org.checkerframework.checker.nullness.qual.Nullable;
import rx.Observable;
import java.io.IOException;
import java.net.URI;
import java.net.UnknownServiceException;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static rx.exceptions.Exceptions.propagate;
/**
* The class is to support fetching Spark Driver log from Yarn application UI.
*/
public class YarnSparkApplicationDriverLog implements SparkDriverLog, Logger {
/**
* A {DriverLogConversionMode} is a type of enum to present Yarn log UI URI combining ways.
*/
private enum DriverLogConversionMode {
WITHOUT_PORT,
WITH_PORT,
ORIGINAL;
public static DriverLogConversionMode next(final DriverLogConversionMode current) {
List<DriverLogConversionMode> modes = Arrays.asList(DriverLogConversionMode.values());
int found = modes.indexOf(current);
return found + 1 >= modes.size() ? ORIGINAL : modes.get(found + 1);
}
}
private final URI yarnNMConnectUri;
@Nullable
private String currentLogUrl;
private DriverLogConversionMode logUriConversionMode = DriverLogConversionMode.WITHOUT_PORT;
private final Cache globalCache;
private final String applicationId;
private final YarnCluster cluster;
private final SparkBatchSubmission submission;
public YarnSparkApplicationDriverLog(String applicationId, YarnCluster cluster, SparkBatchSubmission submission) {
this.applicationId = applicationId;
this.cluster = cluster;
this.submission = submission;
this.yarnNMConnectUri = URI.create(this.cluster.getYarnNMConnectionUrl());
this.globalCache = new Cache();
}
public URI getYarnNMConnectUri() {
return this.yarnNMConnectUri;
}
@Nullable
private String getCurrentLogUrl() {
return this.currentLogUrl;
}
private void setCurrentLogUrl(@Nullable String currentLogUrl) {
this.currentLogUrl = currentLogUrl;
}
private DriverLogConversionMode getLogUriConversionMode() {
return this.logUriConversionMode;
}
private void setLogUriConversionMode(DriverLogConversionMode mode) {
this.logUriConversionMode = mode;
}
/**
* Get the current Spark job Yarn application attempt log URI Observable.
*/
private Observable<URI> getSparkJobYarnCurrentAppAttemptLogsLink(final String appId) {
return Observable.fromCallable(() -> {
URI getYarnAppAttemptsURI = URI.create(YarnSparkApplicationDriverLog.this.getYarnNMConnectUri() + appId
+ "/appattempts");
HttpResponse httpResponse = YarnSparkApplicationDriverLog.this.submission.getHttpResponseViaGet(
getYarnAppAttemptsURI.toString());
Objects.requireNonNull(httpResponse, "httpResponse");
Optional<AppAttempt> currentAttempt = Optional.empty();
int httpResponseCode = httpResponse.getCode();
if (200 <= httpResponseCode && httpResponseCode <= 299) {
currentAttempt = ObjectConvertUtils.convertJsonToObject(httpResponse.getMessage(),
AppAttemptsResponse.class)
.flatMap(it -> it.getAppAttempts().appAttempt.stream()
.max((o1, o2) -> Integer.compare(o1.getId(), o2.getId())));
}
return currentAttempt.orElseThrow(() -> new UnknownServiceException("Bad response when getting from "
+ getYarnAppAttemptsURI + ", response " + httpResponse.getMessage()));
}).map(it -> URI.create(it.getLogsLink()));
}
private Observable<App> getSparkJobYarnApplication() {
return Observable.fromCallable(() -> {
URI getYarnClusterAppURI = URI.create(this.getYarnNMConnectUri() + this.getApplicationId());
HttpResponse httpResponse = this.submission.getHttpResponseViaGet(getYarnClusterAppURI.toString());
Objects.requireNonNull(httpResponse, "httpResponse");
Optional<App> appResponse = Optional.empty();
int httpResponseCode = httpResponse.getCode();
if (200 <= httpResponseCode && httpResponseCode <= 299) {
appResponse = ObjectConvertUtils.convertJsonToObject(httpResponse.getMessage(), AppResponse.class)
.map(AppResponse::getApp);
}
return appResponse.orElseThrow(() -> new UnknownServiceException("Bad response when getting from "
+ getYarnClusterAppURI + ", response " + httpResponse.getMessage()));
});
}
protected Observable<String> getJobLogAggregationDoneObservable() {
return this.getSparkJobYarnApplication()
.repeatWhen(ob -> ob.delay(2, TimeUnit.SECONDS))
.filter(Objects::nonNull)
.takeUntil(this::isYarnAppLogAggregationDone)
.filter(this::isYarnAppLogAggregationDone)
.map(yarnApp -> yarnApp.getLogAggregationStatus().toUpperCase());
}
private boolean isYarnAppLogAggregationDone(App yarnApp) {
switch (yarnApp.getLogAggregationStatus()) {
case "SUCCEEDED":
case "FAILED":
case "TIME_OUT":
return true;
case "DISABLED":
case "NOT_START":
case "RUNNING":
case "RUNNING_WITH_FAILURE":
default:
return false;
}
}
/**
* Get the Spark job driver log URI observable from the container.
*/
private Observable<URI> getSparkJobDriverLogUrlObservable() {
return this.getSparkJobYarnCurrentAppAttemptLogsLink(this.getApplicationId())
.filter(uri -> StringUtils.isNotBlank(uri.getHost()))
.flatMap(this::convertToPublicLogUri);
}
private Observable<Boolean> isUriValid(final URI uriProbe) {
return Observable.fromCallable(() ->
this.submission.getHttpResponseViaGet(uriProbe.toString()).getCode() < 300);
}
private Optional<URI> convertToPublicLogUri(DriverLogConversionMode mode, URI internalLogUrl) {
String normalizedPath = Optional.of(internalLogUrl.getPath()).filter(StringUtils::isNoneBlank)
.orElse("/");
URI yarnUiBase = URI.create(cluster.getYarnUIBaseUrl() + (cluster.getYarnUIBaseUrl().endsWith("/") ? "" : "/"));
switch (mode) {
case WITHOUT_PORT:
return Optional.of(yarnUiBase.resolve(String.format("%s%s", internalLogUrl.getHost(), normalizedPath)));
case WITH_PORT:
return Optional.of(yarnUiBase.resolve(String.format("%s/port/%s%s",
internalLogUrl.getHost(), internalLogUrl.getPath(), normalizedPath)));
case ORIGINAL:
return Optional.of(internalLogUrl);
default:
throw new AssertionError("Unknown DriverLogConversionMode, shouldn't be reached");
}
}
private Observable<URI> convertToPublicLogUri(final URI internalLogUri) {
// New version, without port info in log URL
return this.convertToPublicLogUri(this.getLogUriConversionMode(), internalLogUri)
.map(Observable::just)
.orElseGet(() -> {
// Probe usable driver log URI
DriverLogConversionMode probeMode = YarnSparkApplicationDriverLog.this.getLogUriConversionMode();
while (probeMode != null) {
Optional<URI> uri = this.convertToPublicLogUri(probeMode, internalLogUri)
.filter(uriProbe -> isUriValid(uriProbe).toBlocking().singleOrDefault(false));
if (uri.isPresent()) {
// Find usable one
YarnSparkApplicationDriverLog.this.setLogUriConversionMode(probeMode);
return Observable.just(uri.get());
}
probeMode = DriverLogConversionMode.next(probeMode);
}
// All modes were probed and all failed
return Observable.empty();
});
}
public Observable<SimpleImmutableEntry<String, Long>> getDriverLog(final String type,
final long logOffset,
final int size) {
return this.getSparkJobDriverLogUrlObservable()
.map(Object::toString)
.flatMap(logUrl -> {
long offset = logOffset;
if (!StringUtils.equalsIgnoreCase(logUrl, this.getCurrentLogUrl())) {
this.setCurrentLogUrl(logUrl);
offset = 0L;
}
String driverLogUrl = this.getCurrentLogUrl();
if (driverLogUrl == null) {
return Observable.empty();
}
String logGot = this.getInformationFromYarnLogDom(
this.submission.getCredentialsProvider(), driverLogUrl, type, offset, size);
return StringUtils.isEmpty(logGot)
? Observable.empty()
: Observable.just(new SimpleImmutableEntry<>(logGot, offset));
});
}
public Observable<String> getDriverHost() {
return this.getSparkJobYarnApplication().map(yarnApp -> {
if (yarnApp.isFinished()) {
throw propagate(new UnknownServiceException(
"The Livy job " + this.getApplicationId() + " on yarn is not running."));
}
String driverHttpAddress = yarnApp.getAmHostHttpAddress();
/*
* The sample here is:
* host.domain.com:8900
* or
* 10.0.0.15:30060
*/
String driverHost = this.parseAmHostHttpAddressHost(driverHttpAddress);
if (driverHost == null) {
throw propagate(new UnknownServiceException(
"Bad amHostHttpAddress got from /yarnui/ws/v1/cluster/apps/" + this.getApplicationId()));
}
return driverHost;
});
}
/*
* Parse host from host:port combination string
*
* @param driverHttpAddress the host:port combination string to parse
* @return the host got, otherwise null
*/
private String parseAmHostHttpAddressHost(String driverHttpAddress) {
if (driverHttpAddress == null) {
return null;
} else {
Pattern driverRegex = Pattern.compile("(?<host>[^:]+):(?<port>\\d+)");
Matcher driverMatcher = driverRegex.matcher(driverHttpAddress);
return driverMatcher.matches() ? driverMatcher.group("host") : null;
}
}
public final Cache getGlobalCache() {
return this.globalCache;
}
private String getInformationFromYarnLogDom(final CredentialsProvider credentialsProvider,
final String baseUrl,
final String type,
final long start,
final int size) {
WebClient webClient = new WebClient(BrowserVersion.CHROME);
webClient.setCache(this.globalCache);
if (credentialsProvider != null) {
webClient.setCredentialsProvider(credentialsProvider);
}
URI url = URI.create("$baseUrl/").resolve(
String.format("%s?start=%d", type, start) + (size <= 0 ? "" : String.format("&&end=%d", start + size)));
try {
HtmlPage htmlPage = webClient.getPage(Objects.requireNonNull(url, "Can't get Yarn log URL").toString());
Iterator<DomElement> iterator = htmlPage.getElementById("navcell")
.getNextElementSibling()
.getChildElements()
.iterator();
HashMap<String, String> logTypeMap = new HashMap<>();
final AtomicReference<String> logType = new AtomicReference<>();
String logs = "";
while (iterator.hasNext()) {
DomElement node = iterator.next();
if (node instanceof HtmlParagraph) {
// In history server, need to read log type paragraph in page
final Pattern logTypePattern = Pattern.compile("Log Type:\\s+(\\S+)");
Optional.ofNullable(node.getFirstChild())
.map(DomNode::getTextContent)
.map(StringUtils::trim)
.map(logTypePattern::matcher)
.filter(Matcher::matches)
.map(matcher -> matcher.group(1))
.ifPresent(logType::set);
} else if (node instanceof HtmlPreformattedText) {
// In running, no log type paragraph in page
logs = Optional.ofNullable(node.getFirstChild())
.map(DomNode::getTextContent)
.orElse("");
if (logType.get() != null) {
// Only get the first <pre>...</pre>
logTypeMap.put(logType.get(), logs);
logType.set(null);
}
}
}
return logTypeMap.getOrDefault(type, logs);
} catch (FailingHttpStatusCodeException | IOException serviceError) {
// If the URL is wrong, will get 200 response with content:
// Unable to locate 'xxx' log for container
// OR
// Logs not available for <user>. Aggregation may not be complete,
// Check back later or try the nodemanager at...
// OR
// Cannot get container logs without ...
//
// if fetching Yarn log hits the gap between the job running and stop, will get the status 403
// the log is moving to job history server, just wait and retry.
log().warn("Can't parse information from YarnUI log page " + url, serviceError);
}
return "";
}
public final String getApplicationId() {
return this.applicationId;
}
public final YarnCluster getCluster() {
return this.cluster;
}
}

Просмотреть файл

@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.log;
import org.slf4j.LoggerFactory;
/**
* Base logger class.
*/
public interface Logger {
default org.slf4j.Logger log() {
return LoggerFactory.getLogger(getClass());
}
}

Просмотреть файл

@ -0,0 +1,263 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.processes;
import com.google.common.net.HostAndPort;
import com.microsoft.azure.spark.tools.errors.SparkJobFinishedException;
import com.microsoft.azure.spark.tools.errors.SparkJobUploadArtifactException;
import com.microsoft.azure.spark.tools.events.MessageInfoType;
import com.microsoft.azure.spark.tools.events.SparkBatchJobSubmissionEvent;
import com.microsoft.azure.spark.tools.events.SparkBatchJobSubmittedEvent;
import com.microsoft.azure.spark.tools.job.SparkBatchJob;
import com.microsoft.azure.spark.tools.log.Logger;
import com.microsoft.azure.spark.tools.ux.IdeSchedulers;
import org.apache.commons.io.output.NullOutputStream;
import org.checkerframework.checker.nullness.qual.Nullable;
import rx.Observable;
import rx.Subscription;
import rx.subjects.PublishSubject;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Optional;
import static com.microsoft.azure.spark.tools.events.MessageInfoType.Info;
public class SparkBatchJobRemoteProcess extends Process implements Logger {
private IdeSchedulers schedulers;
private String artifactPath;
private final String title;
private final PublishSubject<SimpleImmutableEntry<MessageInfoType, String>> ctrlSubject;
private SparkJobLogInputStream jobStdoutLogInputSteam;
private SparkJobLogInputStream jobStderrLogInputSteam;
@Nullable
private Subscription jobSubscription;
private final SparkBatchJob sparkJob;
private final PublishSubject<SparkBatchJobSubmissionEvent> eventSubject = PublishSubject.create();
private boolean isDestroyed = false;
private boolean isDisconnected;
public SparkBatchJobRemoteProcess(IdeSchedulers schedulers,
SparkBatchJob sparkJob,
String artifactPath,
String title,
PublishSubject<SimpleImmutableEntry<MessageInfoType, String>> ctrlSubject) {
this.schedulers = schedulers;
this.sparkJob = sparkJob;
this.artifactPath = artifactPath;
this.title = title;
this.ctrlSubject = ctrlSubject;
this.jobStdoutLogInputSteam = new SparkJobLogInputStream("stdout");
this.jobStderrLogInputSteam = new SparkJobLogInputStream("stderr");
}
/**
* To Kill the remote job.
*
* @return is the remote Spark Job killed
*/
public boolean killProcessTree() {
return false;
}
/**
* Is the Spark job session connected.
*
* @return is the Spark Job log getting session still connected
*/
public boolean isDisconnected() {
return isDisconnected;
}
@Nullable
public HostAndPort getLocalTunnel(int i) {
return null;
}
@Override
public OutputStream getOutputStream() {
return new NullOutputStream();
}
@Override
public InputStream getInputStream() {
return jobStdoutLogInputSteam;
}
@Override
public InputStream getErrorStream() {
return jobStderrLogInputSteam;
}
@Override
public int waitFor() {
return 0;
}
@Override
public int exitValue() {
return 0;
}
@Override
public void destroy() {
getSparkJob().killBatchJob().subscribe(
job -> log().trace("Killed Spark batch job " + job.getBatchId()),
err -> log().warn("Got error when killing Spark batch job", err),
() -> { }
);
this.isDestroyed = true;
this.disconnect();
}
public SparkBatchJob getSparkJob() {
return sparkJob;
}
public Optional<Subscription> getJobSubscription() {
return Optional.ofNullable(jobSubscription);
}
public void start() {
// Build, deploy and wait for the job done.
// jobSubscription = prepareArtifact()
jobSubscription = Observable.just(sparkJob)
.flatMap(this::submitJob)
.flatMap(this::awaitForJobStarted)
.flatMap(this::attachInputStreams)
.flatMap(this::awaitForJobDone)
.subscribe(
sdPair -> {
if (sparkJob.isSuccess(sdPair.getKey())) {
ctrlInfo("");
ctrlInfo("========== RESULT ==========");
ctrlInfo("Job run successfully.");
} else {
ctrlInfo("");
ctrlInfo("========== RESULT ==========");
ctrlError("Job state is " + sdPair.getKey());
ctrlError("Diagnostics: " + sdPair.getValue());
}
},
err -> {
if (err instanceof SparkJobFinishedException
|| err.getCause() instanceof SparkJobFinishedException) {
// If we call destroy() when job is dead,
// we will get exception with `job is finished` error message
ctrlError("Job is already finished.");
isDestroyed = true;
disconnect();
} else {
ctrlError(err.getMessage());
destroy();
}
},
() -> {
disconnect();
});
}
private Observable<? extends SparkBatchJob> awaitForJobStarted(SparkBatchJob job) {
return job.awaitStarted()
.map(state -> job);
}
private Observable<? extends SparkBatchJob> attachJobInputStream(SparkJobLogInputStream inputStream,
SparkBatchJob job) {
return Observable.just(inputStream)
.map(stream -> stream.attachJob(job))
.subscribeOn(schedulers.processBarVisibleAsync(
"Attach Spark batch job outputs " + inputStream.getLogType()));
}
public void disconnect() {
this.isDisconnected = true;
this.ctrlSubject.onCompleted();
this.eventSubject.onCompleted();
this.getJobSubscription().ifPresent(Subscription::unsubscribe);
}
protected void ctrlInfo(String message) {
ctrlSubject.onNext(new SimpleImmutableEntry<>(Info, message));
}
protected void ctrlError(String message) {
ctrlSubject.onNext(new SimpleImmutableEntry<>(MessageInfoType.Error, message));
}
public PublishSubject<SparkBatchJobSubmissionEvent> getEventSubject() {
return eventSubject;
}
protected Observable<SparkBatchJob> startJobSubmissionLogReceiver(SparkBatchJob job) {
return job.getSubmissionLog()
.doOnNext(ctrlSubject::onNext)
// "ctrlSubject::onNext" lead to uncaught exception
// while "ctrlError" only print error message in console view
.doOnError(err -> ctrlError(err.getMessage()))
.lastOrDefault(null)
.map((@Nullable SimpleImmutableEntry<MessageInfoType, String> messageTypeText) -> job);
}
// Build and deploy artifact
protected Observable<? extends SparkBatchJob> prepareArtifact() {
return getSparkJob()
.deploy(artifactPath)
.onErrorResumeNext(err -> {
Throwable rootCause = err.getCause() != null ? err.getCause() : err;
return Observable.error(new SparkJobUploadArtifactException(
"Failed to upload Spark application artifacts: " + rootCause.getMessage(), rootCause));
})
.subscribeOn(schedulers.processBarVisibleAsync("Deploy the jar file into cluster"));
}
protected Observable<? extends SparkBatchJob> submitJob(SparkBatchJob batchJob) {
return batchJob
.submit()
.subscribeOn(schedulers.processBarVisibleAsync("Submit the Spark batch job"))
.flatMap(this::startJobSubmissionLogReceiver) // To receive the Livy submission log
.doOnNext(job -> eventSubject.onNext(new SparkBatchJobSubmittedEvent(job)));
}
public IdeSchedulers getSchedulers() {
return schedulers;
}
public String getTitle() {
return title;
}
private Observable<? extends SparkBatchJob> attachInputStreams(SparkBatchJob job) {
return Observable.zip(
attachJobInputStream((SparkJobLogInputStream) getErrorStream(), job),
attachJobInputStream((SparkJobLogInputStream) getInputStream(), job),
(job1, job2) -> job);
}
Observable<SimpleImmutableEntry<String, String>> awaitForJobDone(SparkBatchJob runningJob) {
return runningJob.awaitDone()
.subscribeOn(schedulers.processBarVisibleAsync("Spark batch job " + getTitle() + " is running"))
.flatMap(jobStateDiagnosticsPair -> runningJob
.awaitPostDone()
.subscribeOn(schedulers.processBarVisibleAsync(
"Waiting for " + getTitle() + " log aggregation is done"))
.map(any -> jobStateDiagnosticsPair));
}
public PublishSubject<SimpleImmutableEntry<MessageInfoType, String>> getCtrlSubject() {
return ctrlSubject;
}
public boolean isDestroyed() {
return isDestroyed;
}
}

Просмотреть файл

@ -0,0 +1,95 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.processes;
import com.microsoft.azure.spark.tools.job.SparkBatchJob;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Optional;
import static java.lang.Thread.sleep;
public class SparkJobLogInputStream extends InputStream {
private String logType;
@Nullable
private SparkBatchJob sparkBatchJob;
@Nullable
private String logUrl;
private long offset = 0;
private byte[] buffer = new byte[0];
private int bufferPos;
public SparkJobLogInputStream(String logType) {
this.logType = logType;
}
public SparkBatchJob attachJob(SparkBatchJob sparkJob) {
setSparkBatchJob(sparkJob);
return sparkJob;
}
private synchronized Optional<SimpleImmutableEntry<String, Long>> fetchLog(long logOffset, int fetchSize) {
return Optional.empty();
// return getAttachedJob()
// .map(job -> job.getDriverLog(getLogType(), logOffset, fetchSize)
// .toBlocking().singleOrDefault(null));
}
void setSparkBatchJob(@Nullable SparkBatchJob sparkBatchJob) {
this.sparkBatchJob = sparkBatchJob;
}
public Optional<SparkBatchJob> getAttachedJob() {
return Optional.ofNullable(sparkBatchJob);
}
@Override
public int read() throws IOException {
if (bufferPos >= buffer.length) {
throw new IOException("Beyond the buffer end, needs a new log fetch");
}
return buffer[bufferPos++];
}
@Override
public int available() throws IOException {
if (bufferPos >= buffer.length) {
return fetchLog(offset, -1)
.map(sliceOffsetPair -> {
buffer = sliceOffsetPair.getKey().getBytes();
bufferPos = 0;
offset = sliceOffsetPair.getValue() + sliceOffsetPair.getKey().length();
return buffer.length;
}).orElseGet(() -> {
try {
sleep(3000);
} catch (InterruptedException ignore) { }
return 0;
});
} else {
return buffer.length - bufferPos;
}
}
void setLogUrl(@Nullable String logUrl) {
this.logUrl = logUrl;
}
public Optional<String> getLogUrl() {
return Optional.ofNullable(logUrl);
}
public String getLogType() {
return logType;
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import java.util.Optional;
/**
* The interface is to provide the convert methods for JSON/XML objects.
*/
public interface Convertible {
// serialize an object to xml-format string
default Optional<String> convertToXml() {
return ObjectConvertUtils.convertObjectToJsonString(this);
}
// serialize an object to json-format string
default Optional<String> convertToJson() {
return ObjectConvertUtils.convertObjectToJsonString(this);
}
}

Просмотреть файл

@ -0,0 +1,39 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HttpResponse {
private int code;
private String message;
private Map<String, List<String>> headers;
private String content;
public HttpResponse(int code, String message, Map<String, List<String>> headers,
String content) {
this.code = code;
this.message = message;
this.headers = new HashMap<>(headers);
this.content = content;
}
public int getCode() {
return code;
}
public String getMessage() {
return message;
}
public Map<String, List<String>> getHeaders() {
return headers;
}
public String getContent() {
return content;
}
}

Просмотреть файл

@ -0,0 +1,97 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.http.HttpEntity;
import org.apache.http.util.EntityUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
public final class ObjectConvertUtils {
private static JsonFactory jsonFactory = new JsonFactory();
private static ObjectMapper objectMapper = new ObjectMapper(jsonFactory);
private static XmlMapper xmlMapper = new XmlMapper();
public static <T> Optional<T> convertJsonToObject(String jsonString, Class<T> tClass) throws IOException {
return Optional.ofNullable(objectMapper.readValue(jsonString, tClass));
}
@Nullable
public static <T> T convertToObjectQuietly(String jsonString, Class<T> tClass) {
try {
return objectMapper.readValue(jsonString, tClass);
} catch (IOException e) {
// ignore the exception
}
return null;
}
public static <T> Optional<T> convertEntityToObject(HttpEntity entity, Class<T> tClass) throws IOException {
final String type = entity.getContentType().getValue().toLowerCase();
switch (type) {
case "application/json" :
return convertJsonToObject(EntityUtils.toString(entity), tClass);
case "application/xml" :
return convertXmlToObject(EntityUtils.toString(entity), tClass);
default:
}
return Optional.empty();
}
public static <T> Optional<List<T>> convertEntityToList(HttpEntity entity, Class<T> tClass) throws IOException {
final String type = entity.getContentType().getValue().toLowerCase();
switch (type) {
case "application/json" :
return convertJsonToList(EntityUtils.toString(entity), tClass);
case "application/xml" :
return convertXmlToList(EntityUtils.toString(entity), tClass);
default:
}
return Optional.empty();
}
private static <T> Optional<List<T>> convertJsonToList(String jsonString, Class<T> tClass) throws IOException {
List<T> myLists = objectMapper.readValue(
jsonString, TypeFactory.defaultInstance().constructCollectionType(List.class, tClass));
return Optional.ofNullable(myLists);
}
private static <T> Optional<List<T>> convertXmlToList(String jsonString, Class<T> tClass) throws IOException {
List<T> myLists = xmlMapper.readValue(
jsonString, TypeFactory.defaultInstance().constructCollectionType(List.class, tClass));
return Optional.ofNullable(myLists);
}
static <T> Optional<String> convertObjectToJsonString(T obj) {
try {
return Optional.ofNullable(objectMapper.writeValueAsString(obj));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return Optional.empty();
}
public static <T> Optional<String> convertObjectToXmlString(T obj) {
try {
return Optional.ofNullable(xmlMapper.writeValueAsString(obj));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return Optional.empty();
}
private static <T> Optional<T> convertXmlToObject(String xmlString, Class<T> tClass) throws IOException {
return Optional.ofNullable(xmlMapper.readValue(xmlString, tClass));
}
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
public enum SparkBatchJobState {
NOT_STARTED("not_started"),
STARTING("starting"),
RECOVERING("recovering"),
IDLE("idle"),
RUNNING("running"),
BUSY("busy"),
SHUTTING_DOWN("shutting_down"),
ERROR("error"),
DEAD("dead"),
SUCCESS("success");
private final String state;
SparkBatchJobState(String state) {
this.state = state;
}
@Override
public String toString() {
return state;
}
}

Просмотреть файл

@ -0,0 +1,222 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import com.microsoft.azure.spark.tools.log.Logger;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import java.io.IOException;
public class SparkBatchSubmission implements Logger {
SparkBatchSubmission() {
}
// Singleton Instance
private static SparkBatchSubmission instance = null;
public static SparkBatchSubmission getInstance() {
if (instance == null) {
synchronized (SparkBatchSubmission.class) {
if (instance == null) {
instance = new SparkBatchSubmission();
}
}
}
return instance;
}
private CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// // public String getInstallationID() {
// if (HDInsightLoader.getHDInsightHelper() == null) {
// return "";
// }
//
// return HDInsightLoader.getHDInsightHelper().getInstallationId();
// }
public CloseableHttpClient getHttpClient() throws IOException {
// TrustStrategy ts = ServiceManager.getServiceProvider(TrustStrategy.class);
// SSLConnectionSocketFactory sslSocketFactory = null;
//
// if (ts != null) {
// try {
// SSLContext sslContext = new SSLContextBuilder()
// .loadTrustMaterial(ts)
// .build();
//
// sslSocketFactory = new SSLConnectionSocketFactory(sslContext,
// HttpObservable.isSSLCertificateValidationDisabled()
// ? NoopHostnameVerifier.INSTANCE
// : new DefaultHostnameVerifier());
// } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
// log().error("Prepare SSL Context for HTTPS failure. " + ExceptionUtils.getStackTrace(e));
// }
// }
return HttpClients.custom()
.useSystemProperties()
// .setSSLSocketFactory(sslSocketFactory)
.setDefaultCredentialsProvider(credentialsProvider)
.build();
}
/**
* Set http request credential using username and password.
*
* @param username : username
* @param password : password
*/
public void setUsernamePasswordCredential(String username, String password) {
credentialsProvider.setCredentials(new AuthScope(AuthScope.ANY),
new UsernamePasswordCredentials(username, password));
}
public CredentialsProvider getCredentialsProvider() {
return credentialsProvider;
}
public HttpResponse getHttpResponseViaGet(String connectUrl) throws IOException {
CloseableHttpClient httpclient = getHttpClient();
HttpGet httpGet = new HttpGet(connectUrl);
httpGet.addHeader("Content-Type", "application/json");
httpGet.addHeader("User-Agent", getUserAgentPerRequest(false));
httpGet.addHeader("X-Requested-By", "ambari");
try (CloseableHttpResponse response = httpclient.execute(httpGet)) {
return StreamUtil.getResultFromHttpResponse(response);
}
}
public HttpResponse getHttpResponseViaHead(String connectUrl) throws IOException {
CloseableHttpClient httpclient = getHttpClient();
HttpHead httpHead = new HttpHead(connectUrl);
httpHead.addHeader("Content-Type", "application/json");
httpHead.addHeader("User-Agent", getUserAgentPerRequest(true));
httpHead.addHeader("X-Requested-By", "ambari");
// WORKAROUND: https://github.com/Microsoft/azure-tools-for-java/issues/1358
// The Ambari local account will cause Kerberos authentication initializing infinitely.
// Set a timer here to cancel the progress.
httpHead.setConfig(
RequestConfig
.custom()
.setSocketTimeout(3 * 1000)
.build());
try (CloseableHttpResponse response = httpclient.execute(httpHead)) {
return StreamUtil.getResultFromHttpResponse(response);
}
}
/**
* To generate a User-Agent for HTTP request with a random UUID.
*
* @param isMapToInstallID true for create the relationship between the UUID and InstallationID
* @return the unique UA string
*/
private String getUserAgentPerRequest(boolean isMapToInstallID) {
// String loadingClass = SparkBatchSubmission.class.getClassLoader().getClass().getName().toLowerCase();
// String requestId = AppInsightsClient.getConfigurationSessionId() == null ?
// UUID.randomUUID().toString() :
// AppInsightsClient.getConfigurationSessionId();
//
// if (isMapToInstallID) {
// new AppInsightsHttpRequestInstallIdMapRecord(requestId, getInstallationID()).post();
// }
return "Azure Spark Maven plugin";
}
/**
* Get all batches spark jobs.
*
* @param connectUrl : eg http://localhost:8998/batches
* @return response result
* @throws IOException
*/
public HttpResponse getAllBatchesSparkJobs(String connectUrl) throws IOException {
return getHttpResponseViaGet(connectUrl);
}
/**
* create batch spark job.
*
* @param connectUrl : eg http://localhost:8998/batches
* @param submissionParameter : spark submission parameter
* @return response result
*/
public HttpResponse createBatchSparkJob(String connectUrl,
SparkSubmissionParameter submissionParameter) throws IOException {
CloseableHttpClient httpclient = getHttpClient();
HttpPost httpPost = new HttpPost(connectUrl);
httpPost.addHeader("Content-Type", "application/json");
httpPost.addHeader("User-Agent", getUserAgentPerRequest(true));
httpPost.addHeader("X-Requested-By", "ambari");
StringEntity postingString = new StringEntity(submissionParameter.serializeToJson());
httpPost.setEntity(postingString);
try (CloseableHttpResponse response = httpclient.execute(httpPost)) {
return StreamUtil.getResultFromHttpResponse(response);
}
}
/**
* Get batch spark job status.
*
* @param connectUrl : eg http://localhost:8998/batches
* @param batchId : batch Id
* @return response result
* @throws IOException
*/
public HttpResponse getBatchSparkJobStatus(String connectUrl, int batchId) throws IOException {
return getHttpResponseViaGet(connectUrl + "/" + batchId);
}
/**
* Kill batch job.
*
* @param connectUrl : eg http://localhost:8998/batches
* @param batchId : batch Id
* @return response result
* @throws IOException
*/
public HttpResponse killBatchJob(String connectUrl, int batchId) throws IOException {
CloseableHttpClient httpclient = getHttpClient();
HttpDelete httpDelete = new HttpDelete(connectUrl + "/" + batchId);
httpDelete.addHeader("User-Agent", getUserAgentPerRequest(true));
httpDelete.addHeader("Content-Type", "application/json");
httpDelete.addHeader("X-Requested-By", "ambari");
try (CloseableHttpResponse response = httpclient.execute(httpDelete)) {
return StreamUtil.getResultFromHttpResponse(response);
}
}
/**
* Get batch job full log.
*
* @param connectUrl : eg http://localhost:8998/batches
* @param batchId : batch Id
* @return response result
* @throws IOException
*/
public HttpResponse getBatchJobFullLog(String connectUrl, int batchId) throws IOException {
return getHttpResponseViaGet(String.format(
"%s/%d/log?from=%d&size=%d", connectUrl, batchId, 0, Integer.MAX_VALUE));
}
}

Просмотреть файл

@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import java.util.List;
@SuppressWarnings("nullness")
public class SparkJobLog {
private int id;
private int from;
private int total;
private List<String> log;
public int getId() {
return id;
}
public int getFrom() {
return from;
}
public int getTotal() {
return total;
}
public List<String> getLog() {
return log;
}
}

Просмотреть файл

@ -0,0 +1,273 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.checkerframework.checker.nullness.qual.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public class SparkSubmissionParameter implements Convertible {
/*
* For interactive spark job:
*
* kind The session kind (required) session kind
* proxyUser The user to impersonate that will run this session (e.g. bob) string
* jars Files to be placed on the java classpath list of paths
* pyFiles Files to be placed on the PYTHONPATH list of paths
* files Files to be placed in executor working directory list of paths
* driverMemory Memory for driver (e.g. 1000M, 2G) string
* driverCores Number of cores used by driver (YARN mode only) int
* executorMemory Memory for executor (e.g. 1000M, 2G) string
* executorCores Number of cores used by executor int
* numExecutors Number of executors (YARN mode only) int
* archives Archives to be uncompressed in the executor working directory (YARN mode only) list of paths
* queue The YARN queue to submit too (YARN mode only) string
* name Name of the application string
* conf Spark configuration properties Map of key=val
*/
private String name = "";
private String file = "";
private String className = "";
private String clusterName = "";
private boolean isLocalArtifact = false;
private String artifactName = "";
private String localArtifactPath = "";
private List<String> files = new ArrayList<>();
private List<String> jars = new ArrayList<>();
private List<String> args = new ArrayList<>();
private Map<String, Object> jobConfig = new HashMap<>();
private static final Pattern memorySizeRegex = Pattern.compile("\\d+(.\\d+)?[gGmM]");
public static final String DriverMemory = "driverMemory";
public static final String DriverMemoryDefaultValue = "4G";
public static final String DriverCores = "driverCores";
public static final int DriverCoresDefaultValue = 1;
public static final String ExecutorMemory = "executorMemory";
public static final String ExecutorMemoryDefaultValue = "4G";
public static final String NumExecutors = "numExecutors";
public static final int NumExecutorsDefaultValue = 5;
public static final String ExecutorCores = "executorCores";
public static final int ExecutorCoresDefaultValue = 1;
public static final String Conf = "conf"; // Spark configuration properties
public static final String NAME = "name";
public SparkSubmissionParameter() {
}
public SparkSubmissionParameter(String clusterName,
boolean isLocalArtifact,
String artifactName,
String localArtifactPath,
String filePath,
String className,
List<String> referencedFiles,
List<String> referencedJars,
List<String> args,
Map<String, Object> jobConfig) {
this.clusterName = clusterName;
this.isLocalArtifact = isLocalArtifact;
this.artifactName = artifactName;
this.localArtifactPath = localArtifactPath;
this.file = filePath;
this.className = className;
this.files = referencedFiles;
this.jars = referencedJars;
this.jobConfig = jobConfig;
this.args = args;
}
public void setName(String name) {
this.name = name;
}
public void setFile(String file) {
this.file = file;
}
public void setClassName(String className) {
this.className = className;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public void setLocalArtifact(boolean localArtifact) {
isLocalArtifact = localArtifact;
}
public void setArtifactName(String artifactName) {
this.artifactName = artifactName;
}
@JsonIgnore
public String getClusterName() {
return clusterName;
}
@JsonIgnore
public boolean isLocalArtifact() {
return isLocalArtifact;
}
@JsonIgnore
public String getArtifactName() {
return artifactName;
}
@JsonIgnore
@Nullable
public String getLocalArtifactPath() {
return localArtifactPath;
}
public void setLocalArtifactPath(String path) {
localArtifactPath = path;
}
@JsonProperty(NAME)
public String getName() {
return name;
}
@JsonProperty("file")
@Nullable
public String getFile() {
return file;
}
@JsonProperty("className")
public String getMainClassName() {
return className;
}
@JsonProperty("files")
@Nullable
public List<String> getReferencedFiles() {
return files;
}
@JsonProperty("jars")
@Nullable
public List<String> getReferencedJars() {
return jars;
}
@JsonProperty("args")
@Nullable
public List<String> getArgs() {
return args;
}
@JsonIgnore
@Nullable
public Map<String, Object> getJobConfig() {
return jobConfig;
}
public void setFilePath(String filePath) {
this.file = filePath;
}
@JsonProperty("driverMemory")
@Nullable
public String getDriverMemory() {
return (String) jobConfig.get(DriverMemory);
}
@JsonProperty("driverCores")
@Nullable
public Integer getDriverCores() {
return parseIntegerSafety(jobConfig.get(DriverCores));
}
@JsonProperty("executorMemory")
@Nullable
public String getExecutorMemory() {
return (String) jobConfig.get(ExecutorMemory);
}
@JsonProperty("executorCores")
@Nullable
public Integer getExecutorCores() {
return parseIntegerSafety(jobConfig.get(ExecutorCores));
}
@JsonProperty("numExecutors")
@Nullable
public Integer getNumExecutors() {
return parseIntegerSafety(jobConfig.get(NumExecutors));
}
@JsonProperty("conf")
@Nullable
public Map<String, String> getConf() {
Map<String, String> jobConf = new HashMap<>();
Optional.ofNullable(jobConfig.get(Conf))
.filter(Map.class::isInstance)
.map(Map.class::cast)
.ifPresent(conf -> conf.forEach((k, v) -> jobConf.put((String) k, (String) v)));
return jobConf.isEmpty() ? null : jobConf;
}
@JsonIgnore
@Nullable
private Integer parseIntegerSafety(@Nullable Object maybeInteger) {
if (maybeInteger == null) {
return null;
}
if (maybeInteger instanceof Integer) {
return (Integer) maybeInteger;
}
try {
return Integer.parseInt(maybeInteger.toString());
} catch (Exception ignored) {
return null;
}
}
public String serializeToJson() {
return convertToJson().orElse("");
}
public static final String[] parameterList = new String[] {
SparkSubmissionParameter.DriverMemory,
SparkSubmissionParameter.DriverCores,
SparkSubmissionParameter.ExecutorMemory,
SparkSubmissionParameter.ExecutorCores,
SparkSubmissionParameter.NumExecutors
};
/**
* Checks whether the key is one of Spark Job submission parameters or not.
*
* @param key the key string to check
* @return true if the key is a member of submission parameters; false otherwise
*/
public static boolean isSubmissionParameter(String key) {
return Arrays.stream(SparkSubmissionParameter.parameterList).anyMatch(key::equals);
}
}

Просмотреть файл

@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import java.util.List;
import java.util.Map;
@SuppressWarnings("nullness")
public class SparkSubmitResponse {
private int id;
private String state;
private String appId; // The application ID
private Map<String, Object> appInfo; // The detailed application info
private List<String> log; // The log lines
public String getAppId() {
return appId;
}
public Map<String, Object> getAppInfo() {
return appInfo;
}
public List<String> getLog() {
return log;
}
public int getId() {
return id;
}
public String getState() {
return state;
}
public boolean isAlive() {
return !this.getState().equals("error")
&& !this.getState().equals("success")
&& !this.getState().equals("dead");
}
}

Просмотреть файл

@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.StringEntity;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URL;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import static java.util.Objects.requireNonNull;
public class StreamUtil {
public static String getResultFromInputStream(InputStream inputStream) throws IOException {
// change string buffer to string builder for thread-safe
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
result.append(line);
}
}
return result.toString();
}
public static HttpResponse getResultFromHttpResponse(CloseableHttpResponse response) throws IOException {
int code = response.getStatusLine().getStatusCode();
String reason = response.getStatusLine().getReasonPhrase();
// Entity for HEAD is empty
HttpEntity entity = Optional.ofNullable(response.getEntity())
.orElse(new StringEntity(""));
try (InputStream inputStream = entity.getContent()) {
String responseContent = getResultFromInputStream(inputStream);
return new HttpResponse(code, responseContent, new HashMap<String, List<String>>(), reason);
}
}
public static File getResourceFile(String resource) throws IOException {
File file = null;
URL res = streamUtil.getClass().getResource(resource);
if (res == null) {
throw new FileNotFoundException("The resource " + resource + " isn't found");
}
if (res.toString().startsWith("jar:")) {
InputStream input = null;
OutputStream out = null;
try {
input = requireNonNull(streamUtil.getClass().getResourceAsStream(resource));
file = File.createTempFile(String.valueOf(new Date().getTime()), ".tmp");
out = new FileOutputStream(file);
int read;
byte[] bytes = new byte[1024];
while ((read = input.read(bytes)) != -1) {
out.write(bytes, 0, read);
}
} finally {
if (input != null) {
input.close();
}
if (out != null) {
out.flush();
out.close();
}
if (file != null) {
file.deleteOnExit();
}
}
} else {
file = new File(res.getFile());
}
return file;
}
private static StreamUtil streamUtil = new StreamUtil();
private static ClassLoader classLoader = requireNonNull(streamUtil.getClass().getClassLoader());
private static final String SPARK_SUBMISSION_FOLDER = "SparkSubmission";
}

Просмотреть файл

@ -0,0 +1,371 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.microsoft.azure.spark.tools.restapi.Convertible;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* An application resource contains information about a particular application that was submitted to a cluster.
*
* Based on Hadoop 3.0.0, refer to
* https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Application_API
*
* Use the following URI to obtain an app object, from a application identified by the appid value.
* http://$rmHttpAddress:port/ws/v1/cluster/apps/$appid
*
* HTTP Operations Supported
* GET
*
* Query Parameters Supported
* None
*/
@JsonIgnoreProperties(ignoreUnknown = true)
@SuppressWarnings("nullness")
public class App implements Convertible {
private String id; // The application id
private String user; // The user who started the application
private String name; // The application name
private String applicationType; // The application type
private String queue; // The queue the application was submitted to
private String state; // The application state according to the ResourceManager -
// valid values are members of the YarnApplicationState enum:
// NEW, NEW_SAVING, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
private String finalStatus; // The final status of the application if finished -
// reported by the application itself - valid values are:
// UNDEFINED, SUCCEEDED, FAILED, KILLED
private float progress; // The progress of the application as a percent
private String trackingUI; // Where the tracking url is currently pointing -
// History (for history server) or ApplicationMaster
private String trackingUrl; // The web URL that can be used to track the application
private String diagnostics; // Detailed diagnostics information
private long clusterId; // The cluster id
private long startedTime; // The time in which application started (in ms since epoch)
private long finishedTime; // The time in which the application finished (in ms since epoch)
private long elapsedTime; // The elapsed time since the application started (in ms)
private String amContainerLogs; // The URL of the application master container logs
private String amHostHttpAddress; // The nodes http address of the application master
private int allocatedMB; // The sum of memory in MB allocated to the applications running containers
private int allocatedVCores; // The sum of virtual cores allocated to the applications running containers
private int runningContainers; // The number of containers currently running for the application
private long memorySeconds; // The amount of memory the application has allocated (megabyte-seconds)
private long vcoreSeconds; // The amount of CPU resources the application has allocated
// (virtual core-seconds)
private String applicationTags; // Comma separated tags of an application
private float clusterUsagePercentage; // The percentage of resources of the cluster that the app is using.
private long preemptedResourceMB; // Memory used by preempted container
private int preemptedResourceVCores; // Number of virtual cores used by preempted container
private boolean unmanagedApplication; // Is the application unmanaged.
private String priority; //Priority of the submitted application
private String logAggregationStatus; // Status of log aggregation - valid values are the members of
// the LogAggregationStatus enum:
// DISABLED, NOT_START, RUNNING,
// RUNNING_WITH_FAILURE, SUCCEEDED, FAILED, TIME_OUT
private int numNonAMContainerPreempted; // Number of standard containers preempted;
private String amNodeLabelExpression; // Node Label expression which is used to identify the node on which
// applications AM container is expected to run.
private int numAMContainerPreempted; // Number of application master containers preempted
private float queueUsagePercentage; // The percentage of resources of the queue that the app is using
private List<ResourceRequest> resourceRequests; // additional for HDInsight cluster
public List<ResourceRequest> getResourceRequests() {
return resourceRequests;
}
public void setResourceRequests(List<ResourceRequest> resourceRequests) {
this.resourceRequests = resourceRequests;
}
public float getProgress() {
return progress;
}
public void setProgress(float progress) {
this.progress = progress;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public float getClusterUsagePercentage() {
return clusterUsagePercentage;
}
public void setClusterUsagePercentage(float clusterUsagePercentage) {
this.clusterUsagePercentage = clusterUsagePercentage;
}
public String getTrackingUI() {
return trackingUI;
}
public void setTrackingUI(String trackingUI) {
this.trackingUI = trackingUI;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getAmContainerLogs() {
return amContainerLogs;
}
public void setAmContainerLogs(String amContainerLogs) {
this.amContainerLogs = amContainerLogs;
}
public String getApplicationType() {
return applicationType;
}
public void setApplicationType(String applicationType) {
this.applicationType = applicationType;
}
public int getPreemptedResourceVCores() {
return preemptedResourceVCores;
}
public void setPreemptedResourceVCores(int preemptedResourceVCores) {
this.preemptedResourceVCores = preemptedResourceVCores;
}
public int getRunningContainers() {
return runningContainers;
}
public void setRunningContainers(int runningContainers) {
this.runningContainers = runningContainers;
}
public int getAllocatedMB() {
return allocatedMB;
}
public void setAllocatedMB(int allocatedMB) {
this.allocatedMB = allocatedMB;
}
public long getPreemptedResourceMB() {
return preemptedResourceMB;
}
public void setPreemptedResourceMB(long preemptedResourceMB) {
this.preemptedResourceMB = preemptedResourceMB;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public boolean isUnmanagedApplication() {
return unmanagedApplication;
}
public void setUnmanagedApplication(boolean unmanagedApplication) {
this.unmanagedApplication = unmanagedApplication;
}
public String getPriority() {
return priority;
}
public void setPriority(String priority) {
this.priority = priority;
}
public long getFinishedTime() {
return finishedTime;
}
public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
}
public int getAllocatedVCores() {
return allocatedVCores;
}
public void setAllocatedVCores(int allocatedVCores) {
this.allocatedVCores = allocatedVCores;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getLogAggregationStatus() {
return logAggregationStatus;
}
public void setLogAggregationStatus(String logAggregationStatus) {
this.logAggregationStatus = logAggregationStatus;
}
public long getVcoreSeconds() {
return vcoreSeconds;
}
public void setVcoreSeconds(long vcoreSeconds) {
this.vcoreSeconds = vcoreSeconds;
}
public int getNumNonAMContainerPreempted() {
return numNonAMContainerPreempted;
}
public void setNumNonAMContainerPreempted(int numNonAMContainerPreempted) {
this.numNonAMContainerPreempted = numNonAMContainerPreempted;
}
public long getMemorySeconds() {
return memorySeconds;
}
public void setMemorySeconds(long memorySeconds) {
this.memorySeconds = memorySeconds;
}
public long getElapsedTime() {
return elapsedTime;
}
public void setElapsedTime(long elapsedTime) {
this.elapsedTime = elapsedTime;
}
public String getAmNodeLabelExpression() {
return amNodeLabelExpression;
}
public void setAmNodeLabelExpression(String amNodeLabelExpression) {
this.amNodeLabelExpression = amNodeLabelExpression;
}
public String getAmHostHttpAddress() {
return amHostHttpAddress;
}
public void setAmHostHttpAddress(String amHostHttpAddress) {
this.amHostHttpAddress = amHostHttpAddress;
}
public String getFinalStatus() {
return finalStatus;
}
public void setFinalStatus(String finalStatus) {
this.finalStatus = finalStatus;
}
public String getTrackingUrl() {
return trackingUrl;
}
public void setTrackingUrl(String trackingUrl) {
this.trackingUrl = trackingUrl;
}
public int getNumAMContainerPreempted() {
return numAMContainerPreempted;
}
public void setNumAMContainerPreempted(int numAMContainerPreempted) {
this.numAMContainerPreempted = numAMContainerPreempted;
}
public String getApplicationTags() {
return applicationTags;
}
public void setApplicationTags(String applicationTags) {
this.applicationTags = applicationTags;
}
public long getClusterId() {
return clusterId;
}
public void setClusterId(long clusterId) {
this.clusterId = clusterId;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getDiagnostics() {
return diagnostics;
}
public void setDiagnostics(String diagnostics) {
this.diagnostics = diagnostics;
}
public long getStartedTime() {
return startedTime;
}
public void setStartedTime(long startedTime) {
this.startedTime = startedTime;
}
public float getQueueUsagePercentage() {
return queueUsagePercentage;
}
public void setQueueUsagePercentage(float queueUsagePercentage) {
this.queueUsagePercentage = queueUsagePercentage;
}
/**
* Check if the Yarn job finish or not.
*
* @return true for finished.
*/
public boolean isFinished() {
String stateUpper = this.getState().toUpperCase();
return stateUpper.equals("FINISHED") || stateUpper.equals("FAILED") || stateUpper.equals("KILLED");
}
/**
* Check if it is job submit from livy.
*
* @return true for livy job.
*/
public boolean isLivyJob() {
return getUser().equalsIgnoreCase("livy");
}
public static final List<App> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<>(0));
}

Просмотреть файл

@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown = true)
@SuppressWarnings("nullness")
public class AppAttempt {
private int id; // The app attempt id
private String nodeId; // The node id of the node the attempt ran on
private String nodeHttpAddress; // The node http address of the node the attempt ran on
private String logsLink; // The http link to the app attempt logs
private String containerId; // The id of the container for the app attempt
private long startTime; // The start time of the attempt (in ms since epoch)
private long finishedTime; // The end time of the attempt (in ms since epoch), 0 for not end
private String blacklistedNodes; // Nodes blacklist
private String appAttemptId; // App Attempt Id
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getNodeHttpAddress() {
return nodeHttpAddress;
}
public void setNodeHttpAddress(String nodeHttpAddress) {
this.nodeHttpAddress = nodeHttpAddress;
}
public String getLogsLink() {
return logsLink;
}
public void setLogsLink(String logsLink) {
this.logsLink = logsLink;
}
public String getContainerId() {
return containerId;
}
public void setContainerId(String containerId) {
this.containerId = containerId;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getFinishedTime() {
return finishedTime;
}
public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
}
public String getBlacklistedNodes() {
return blacklistedNodes;
}
public void setBlacklistedNodes(String blacklistedNodes) {
this.blacklistedNodes = blacklistedNodes;
}
public String getAppAttemptId() {
return appAttemptId;
}
public void setAppAttemptId(String appAttemptId) {
this.appAttemptId = appAttemptId;
}
}

Просмотреть файл

@ -0,0 +1,23 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import java.util.List;
@SuppressWarnings("nullness")
public class AppAttemptsResponse {
public class AppAttempts {
public List<AppAttempt> appAttempt;
}
private AppAttempts appAttempts;
public AppAttempts getAppAttempts() {
return appAttempts;
}
public void setAppAttempts(AppAttempts appAttempts) {
this.appAttempts = appAttempts;
}
}

Просмотреть файл

@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.microsoft.azure.spark.tools.restapi.Convertible;
@SuppressWarnings("nullness")
public class AppResponse implements Convertible {
private App app;
public App getApp() {
return app;
}
public void setApp(App app) {
this.app = app;
}
public static final AppResponse EMPTY = new AppResponse();
}

Просмотреть файл

@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import org.checkerframework.checker.nullness.qual.Nullable;
public class ApplicationMasterLogs {
@Nullable
private String stderr;
@Nullable
private String stdout;
@Nullable
private String directoryInfo;
public ApplicationMasterLogs(@Nullable String standout, @Nullable String standerr, @Nullable String directoryInfo) {
this.stdout = standout;
this.stderr = standerr;
this.directoryInfo = directoryInfo;
}
public ApplicationMasterLogs() {
}
@Nullable
public String getStderr() {
return stderr;
}
public void setStderr(@Nullable String stderr) {
this.stderr = stderr;
}
@Nullable
public String getStdout() {
return stdout;
}
public void setStdout(@Nullable String stdout) {
this.stdout = stdout;
}
@Nullable
public String getDirectoryInfo() {
return directoryInfo;
}
public void setDirectoryInfo(@Nullable String directoryInfo) {
this.directoryInfo = directoryInfo;
}
}

Просмотреть файл

@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.microsoft.azure.spark.tools.restapi.Convertible;
@SuppressWarnings("nullness")
public class Capability implements Convertible {
private String virtualCores;
private String memorySize;
private String memory;
public String getVirtualCores() {
return virtualCores;
}
public void setVirtualCores(String virtualCores) {
this.virtualCores = virtualCores;
}
public String getMemorySize() {
return memorySize;
}
public void setMemorySize(String memorySize) {
this.memorySize = memorySize;
}
public String getMemory() {
return memory;
}
public void setMemory(String memory) {
this.memory = memory;
}
}

Просмотреть файл

@ -0,0 +1,130 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.microsoft.azure.spark.tools.restapi.Convertible;
@SuppressWarnings("nullness")
public class ClusterInfo implements Convertible {
private String id;
private String hadoopBuildVersion;
private String haState;
private String hadoopVersionBuiltOn;
private String hadoopVersion;
private String startedOn;
private String resourceManagerVersion;
private String haZooKeeperConnectionState;
private String state;
private String rmStateStoreName;
private String resourceManagerVersionBuiltOn;
private String resourceManagerBuildVersion;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getHadoopBuildVersion() {
return hadoopBuildVersion;
}
public void setHadoopBuildVersion(String hadoopBuildVersion) {
this.hadoopBuildVersion = hadoopBuildVersion;
}
public String getHaState() {
return haState;
}
public void setHaState(String haState) {
this.haState = haState;
}
public String getHadoopVersionBuiltOn() {
return hadoopVersionBuiltOn;
}
public void setHadoopVersionBuiltOn(String hadoopVersionBuiltOn) {
this.hadoopVersionBuiltOn = hadoopVersionBuiltOn;
}
public String getHadoopVersion() {
return hadoopVersion;
}
public void setHadoopVersion(String hadoopVersion) {
this.hadoopVersion = hadoopVersion;
}
public String getStartedOn() {
return startedOn;
}
public void setStartedOn(String startedOn) {
this.startedOn = startedOn;
}
public String getResourceManagerVersion() {
return resourceManagerVersion;
}
public void setResourceManagerVersion(String resourceManagerVersion) {
this.resourceManagerVersion = resourceManagerVersion;
}
public String getHaZooKeeperConnectionState() {
return haZooKeeperConnectionState;
}
public void setHaZooKeeperConnectionState(String haZooKeeperConnectionState) {
this.haZooKeeperConnectionState = haZooKeeperConnectionState;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getRmStateStoreName() {
return rmStateStoreName;
}
public void setRmStateStoreName(String rmStateStoreName) {
this.rmStateStoreName = rmStateStoreName;
}
public String getResourceManagerVersionBuiltOn() {
return resourceManagerVersionBuiltOn;
}
public void setResourceManagerVersionBuiltOn(String resourceManagerVersionBuiltOn) {
this.resourceManagerVersionBuiltOn = resourceManagerVersionBuiltOn;
}
public String getResourceManagerBuildVersion() {
return resourceManagerBuildVersion;
}
public void setResourceManagerBuildVersion(String resourceManagerBuildVersion) {
this.resourceManagerBuildVersion = resourceManagerBuildVersion;
}
}

Просмотреть файл

@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.microsoft.azure.spark.tools.restapi.Convertible;
@SuppressWarnings("nullness")
public class Priority implements Convertible {
private String priority;
public String getPriority() {
return priority;
}
public void setPriority(String priority) {
this.priority = priority;
}
}

Просмотреть файл

@ -0,0 +1,70 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.microsoft.azure.spark.tools.restapi.Convertible;
@SuppressWarnings("nullness")
public class ResourceRequest implements Convertible {
private String nodeLabelExpression;
private Priority priority;
private String relaxLocality;
private String numContainers;
private Capability capability;
private String resourceName;
public String getNodeLabelExpression() {
return nodeLabelExpression;
}
public void setNodeLabelExpression(String nodeLabelExpression) {
this.nodeLabelExpression = nodeLabelExpression;
}
public Priority getPriority() {
return priority;
}
public void setPriority(Priority priority) {
this.priority = priority;
}
public String getRelaxLocality() {
return relaxLocality;
}
public void setRelaxLocality(String relaxLocality) {
this.relaxLocality = relaxLocality;
}
public String getNumContainers() {
return numContainers;
}
public void setNumContainers(String numContainers) {
this.numContainers = numContainers;
}
public Capability getCapability() {
return capability;
}
public void setCapability(Capability capability) {
this.capability = capability;
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
}

Просмотреть файл

@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.microsoft.azure.spark.tools.restapi.Convertible;
import java.util.List;
import java.util.Optional;
@SuppressWarnings("nullness")
public class YarnApplicationResponse implements Convertible {
private YarnApplications apps;
public YarnApplications getApps() {
return apps;
}
public void setApps(YarnApplications apps) {
this.apps = apps;
}
public static YarnApplicationResponse EMPTY = new YarnApplicationResponse();
public Optional<List<App>> getAllApplication() {
if (apps != null) {
return Optional.ofNullable(apps.getApps());
}
return Optional.empty();
}
}

Просмотреть файл

@ -0,0 +1,37 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.restapi.yarn.rm;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.microsoft.azure.spark.tools.restapi.Convertible;
import java.util.List;
/**
* An application resource contains information about a particular application that was submitted to a cluster.
*
* Based on Hadoop 3.0.0, refer to https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html#Cluster_Application_API
*
* Use the following URI to obtain an apps list,
* http://$rmHttpAddress:port>/ws/v1/cluster/$apps
*
* HTTP Operations Supported
* GET
*
* Query Parameters Supported
* None
*/
@SuppressWarnings("nullness")
public class YarnApplications implements Convertible {
@JsonProperty(value = "app")
private List<App> apps;
public List<App> getApps() {
return apps;
}
public void setApps(List<App> apps) {
this.apps = apps;
}
}

Просмотреть файл

@ -0,0 +1,14 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
package com.microsoft.azure.spark.tools.ux;
import rx.Scheduler;
public interface IdeSchedulers {
public Scheduler processBarVisibleAsync(String title);
public Scheduler processBarVisibleSync(String title);
public Scheduler dispatchUIThread();
}

11
suppressions.xml Normal file
Просмотреть файл

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<!DOCTYPE suppressions PUBLIC
"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
"https://checkstyle.org/dtds/suppressions_1_2.dtd">
<suppressions>
<suppress checks="(?:Javadoc|DesignForExtension|FinalParameters|VisibilityModifier|CommentsIndentation).*"
files="com[\\/]microsoft[\\/]azure[\\/]spark[\\/]tools[\\/]restapi[\\/]yarn[\\/]rm[\\/].*.java"
/>
</suppressions>