Merge remote-tracking branch 'jey/hadoop-agnostic'

Conflicts:
	core/src/main/scala/spark/PairRDDFunctions.scala
This commit is contained in:
Matei Zaharia 2013-08-20 10:14:15 -07:00
Родитель d61337f640 6f6944c807
Коммит aa2b89d98d
45 изменённых файлов: 680 добавлений и 1554 удалений

Просмотреть файл

@ -37,56 +37,31 @@
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<properties>
<classifier.name>hadoop1</classifier.name>
</properties>
</profile>
<profile>
<id>hadoop2</id>
<properties>
<classifier.name>hadoop2</classifier.name>
</properties>
</profile>
<profile>
<id>hadoop2-yarn</id>
<properties>
<classifier.name>hadoop2-yarn</classifier.name>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<classifier>${classifier.name}</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<classifier>${classifier.name}</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<classifier>${classifier.name}</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<classifier>${classifier.name}</classifier>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<classifier>${classifier.name}</classifier>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>

Просмотреть файл

@ -32,11 +32,15 @@
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
@ -58,103 +62,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

Просмотреть файл

@ -34,6 +34,7 @@ set EXAMPLES_DIR=%FWDIR%examples
set BAGEL_DIR=%FWDIR%bagel
set MLLIB_DIR=%FWDIR%mllib
set TOOLS_DIR=%FWDIR%tools
set YARN_DIR=%FWDIR%yarn
set STREAMING_DIR=%FWDIR%streaming
set PYSPARK_DIR=%FWDIR%python
@ -50,6 +51,7 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%MLLIB_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%TOOLS_DIR%\target\scala-%SCALA_VERSION%\classes
set CLASSPATH=%CLASSPATH%;%YARN_DIR%\target\scala-%SCALA_VERSION%\classes
rem Add hadoop conf dir - else FileSystem.*, etc fail
rem Note, this assumes that there is either a HADOOP_CONF_DIR or YARN_CONF_DIR which hosts

Просмотреть файл

@ -37,6 +37,7 @@ EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
MLLIB_DIR="$FWDIR/mllib"
TOOLS_DIR="$FWDIR/tools"
YARN_DIR="$FWDIR/yarn"
STREAMING_DIR="$FWDIR/streaming"
PYSPARK_DIR="$FWDIR/python"
@ -62,16 +63,18 @@ function dev_classpath {
CLASSPATH="$CLASSPATH:$REPL_DIR/lib/*"
# Add the shaded JAR for Maven builds
if [ -e $REPL_BIN_DIR/target ]; then
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded-hadoop*.jar'`; do
for jar in `find "$REPL_BIN_DIR/target" -name 'spark-repl-*-shaded.jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
# The shaded JAR doesn't contain examples, so include those separately
EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples"*[0-9T].jar`
CLASSPATH+=":$EXAMPLES_JAR"
for jar in `find "$EXAMPLES_DIR/target" -name 'spark-examples*[0-9T].jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done
fi
CLASSPATH="$CLASSPATH:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$MLLIB_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$TOOLS_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$YARN_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
CLASSPATH="$CLASSPATH:$jar"
done

Просмотреть файл

@ -32,6 +32,18 @@
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -126,7 +138,6 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
@ -204,183 +215,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
<source>src/hadoop1/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
<source>src/hadoop2/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
<source>src/hadoop2-yarn/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

Просмотреть файл

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred
trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
jobId, isMap, taskId, attemptId)
}

Просмотреть файл

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
jobId, isMap, taskId, attemptId)
}

Просмотреть файл

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package spark.deploy
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
/**
* Contains util methods to interact with Hadoop from spark.
*/
object SparkHadoopUtil {
def getUserNameFromEnvironment(): String = {
// defaulting to -D ...
System.getProperty("user.name")
}
def runAsUser(func: (Product) => Unit, args: Product) {
// Add support, if exists - for now, simply run func !
func(args)
}
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
def addCredentials(conf: JobConf) {}
def isYarnMode(): Boolean = { false }
}

Просмотреть файл

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred
import org.apache.hadoop.mapreduce.TaskType
trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
}

Просмотреть файл

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
import task.{TaskAttemptContextImpl, JobContextImpl}
trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
}

Просмотреть файл

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred
trait HadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
jobId, isMap, taskId, attemptId)
}

Просмотреть файл

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
import task.{TaskAttemptContextImpl, JobContextImpl}
trait HadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
jobId, isMap, taskId, attemptId)
}

Просмотреть файл

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred
trait SparkHadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
}
private def firstAvailableClass(first: String, second: String): Class[_] = {
try {
Class.forName(first)
} catch {
case e: ClassNotFoundException =>
Class.forName(second)
}
}
}

Просмотреть файл

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce
import org.apache.hadoop.conf.Configuration
import java.lang.{Integer => JInteger, Boolean => JBoolean}
trait SparkHadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
val klass = firstAvailableClass(
"org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn
"org.apache.hadoop.mapreduce.JobContext") // hadoop1
val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}
def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass(
"org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", // hadoop2, hadoop2-yarn
"org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1
val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}
def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
try {
// first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
} catch {
case exc: NoSuchMethodException => {
// failed, look for the new ctor that takes a TaskType (not available in 1.x)
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
}
}
}
private def firstAvailableClass(first: String, second: String): Class[_] = {
try {
Class.forName(first)
} catch {
case e: ClassNotFoundException =>
Class.forName(second)
}
}
}

Просмотреть файл

@ -31,12 +31,14 @@ import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.SparkHadoopWriter
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
import spark.partial.PartialResult
@ -50,7 +52,7 @@ import spark.Partitioner._
*/
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
extends Logging
with HadoopMapReduceUtil
with SparkHadoopMapReduceUtil
with Serializable {
/**
@ -627,7 +629,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
@ -653,7 +655,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
val writer = new HadoopWriter(conf)
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {

Просмотреть файл

@ -52,7 +52,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
@ -235,7 +235,8 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val conf = SparkHadoopUtil.newConfiguration()
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
@ -623,10 +624,11 @@ class SparkContext(
logWarning("null specified as parameter to addJar",
new SparkException("null specified as parameter to addJar"))
} else {
val env = SparkEnv.get
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" =>
if (SparkHadoopUtil.isYarnMode()) {
if (env.hadoop.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
return
}
@ -809,8 +811,9 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
val env = SparkEnv.get
val path = new Path(dir)
val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
val fs = path.getFileSystem(env.hadoop.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")

Просмотреть файл

@ -25,6 +25,7 @@ import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager
import spark.metrics.MetricsSystem
import spark.deploy.SparkHadoopUtil
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
@ -58,6 +59,19 @@ class SparkEnv (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if(yarnMode) {
try {
Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
} catch {
case th: Throwable => throw new SparkException("Unable to load YARN support", th)
}
} else {
new SparkHadoopUtil
}
}
def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()

Просмотреть файл

@ -36,7 +36,7 @@ import spark.SerializableWritable
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
splitID = splitid
attemptID = attemptid
jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid))
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}
@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
}
object HadoopWriter {
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())

Просмотреть файл

@ -266,8 +266,9 @@ private object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val env = SparkEnv.get
val uri = new URI(url)
val conf = SparkHadoopUtil.newConfiguration()
val conf = env.hadoop.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
@ -406,10 +407,6 @@ private object Utils extends Logging {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
def getUserNameFromEnvironment(): String = {
SparkHadoopUtil.getUserNameFromEnvironment
}
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()

Просмотреть файл

@ -23,18 +23,7 @@ import org.apache.hadoop.mapred.JobConf
/**
* Contains util methods to interact with Hadoop from spark.
*/
object SparkHadoopUtil {
def getUserNameFromEnvironment(): String = {
// defaulting to -D ...
System.getProperty("user.name")
}
def runAsUser(func: (Product) => Unit, args: Product) {
// Add support, if exists - for now, simply run func !
func(args)
}
class SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()

Просмотреть файл

@ -22,9 +22,8 @@ import java.nio.ByteBuffer
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import spark.{Logging, Utils}
import spark.{Logging, Utils, SparkEnv}
import spark.TaskState.TaskState
import spark.deploy.SparkHadoopUtil
import spark.scheduler.cluster.StandaloneClusterMessages._
import spark.util.AkkaUtils
@ -82,19 +81,6 @@ private[spark] class StandaloneExecutorBackend(
private[spark] object StandaloneExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
}
// This will be run 'as' the user
def run0(args: Product) {
assert(4 == args.productArity)
runImpl(args.productElement(0).asInstanceOf[String],
args.productElement(1).asInstanceOf[String],
args.productElement(2).asInstanceOf[String],
args.productElement(3).asInstanceOf[Int])
}
private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Debug code
Utils.checkHost(hostname)

Просмотреть файл

@ -25,7 +25,6 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
import spark.deploy.SparkHadoopUtil
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@ -82,8 +81,9 @@ private[spark] object CheckpointRDD extends Logging {
}
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration())
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@ -101,7 +101,7 @@ private[spark] object CheckpointRDD extends Logging {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = SparkEnv.get.serializer.newInstance()
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
@ -121,10 +121,11 @@ private[spark] object CheckpointRDD extends Logging {
}
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
val env = SparkEnv.get
val fs = path.getFileSystem(env.hadoop.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = SparkEnv.get.serializer.newInstance()
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// Register an on-task-completion callback to close the input stream.
@ -140,10 +141,11 @@ private[spark] object CheckpointRDD extends Logging {
import spark._
val Array(cluster, hdfsPath) = args
val env = SparkEnv.get
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
val fs = path.getFileSystem(env.hadoop.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")

Просмотреть файл

@ -32,8 +32,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
import spark.deploy.SparkHadoopUtil
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@ -68,7 +67,8 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
SparkHadoopUtil.addCredentials(conf);
val env = SparkEnv.get
env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)

Просмотреть файл

@ -43,7 +43,7 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with HadoopMapReduceUtil
with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it

Просмотреть файл

@ -17,7 +17,7 @@
package spark.scheduler
import spark.Logging
import spark.{Logging, SparkEnv}
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
import spark.deploy.SparkHadoopUtil
/**
@ -88,8 +87,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val conf = new JobConf(configuration)
SparkHadoopUtil.addCredentials(conf);
env.hadoop.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@ -108,8 +108,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
val env = SparkEnv.get
val jobConf = new JobConf(configuration)
SparkHadoopUtil.addCredentials(jobConf);
env.hadoop.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =

Просмотреть файл

@ -55,7 +55,7 @@ This would be used to connect to the cluster, write to the dfs and submit jobs t
The command to launch the YARN Client is as follows:
SPARK_JAR=<SPARK_YAR_FILE> ./run spark.deploy.yarn.Client \
SPARK_JAR=<SPARK_YARN_JAR_FILE> ./run spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
@ -68,7 +68,7 @@ The command to launch the YARN Client is as follows:
For example:
SPARK_JAR=./core/target/spark-core-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
SPARK_JAR=./yarn/target/spark-yarn-assembly-{{site.SPARK_VERSION}}.jar ./run spark.deploy.yarn.Client \
--jar examples/target/scala-{{site.SCALA_VERSION}}/spark-examples_{{site.SCALA_VERSION}}-{{site.SPARK_VERSION}}.jar \
--class spark.examples.SparkPi \
--args yarn-standalone \

Просмотреть файл

@ -32,6 +32,36 @@
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
@ -55,41 +85,41 @@
<artifactId>scalacheck_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>1.2.5</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
</exclusion>
<exclusion>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cassandra.deps</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>1.2.5</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.googlecode.concurrentlinkedhashmap</groupId>
<artifactId>concurrentlinkedhashmap-lru</artifactId>
</exclusion>
<exclusion>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.cassandra.deps</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
@ -101,154 +131,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-mllib</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

Просмотреть файл

@ -21,7 +21,6 @@ import java.util.Random
import scala.math.exp
import spark.util.Vector
import spark._
import spark.deploy.SparkHadoopUtil
import spark.scheduler.InputFormatInfo
/**
@ -52,7 +51,7 @@ object SparkHdfsLR {
System.exit(1)
}
val inputPath = args(1)
val conf = SparkHadoopUtil.newConfiguration()
val conf = SparkEnv.get.hadoop.newConfiguration()
val sc = new SparkContext(args(0), "SparkHdfsLR",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")), Map(),
InputFormatInfo.computePreferredLocations(

Просмотреть файл

@ -24,9 +24,10 @@
# so it is completely self contained.
# It does not contain source or *.class files.
#
# Arguments
# (none): Creates dist/ directory
# tgz: Additionally creates spark-$VERSION-bin.tar.gz
# Optional Arguments
# --tgz: Additionally creates spark-$VERSION-bin.tar.gz
# --hadoop VERSION: Builds against specified version of Hadoop.
# --with-yarn: Enables support for Hadoop YARN.
#
# Recommended deploy/testing procedure (standalone mode):
# 1) Rsync / deploy the dist/ dir to one host
@ -44,20 +45,50 @@ DISTDIR="$FWDIR/dist"
export TERM=dumb # Prevents color codes in SBT output
VERSION=$($FWDIR/sbt/sbt "show version" | tail -1 | cut -f 2 | sed 's/^\([a-zA-Z0-9.-]*\).*/\1/')
if [ "$1" == "tgz" ]; then
echo "Making spark-$VERSION-bin.tar.gz"
# Initialize defaults
SPARK_HADOOP_VERSION=1.2.1
SPARK_WITH_YARN=false
MAKE_TGZ=false
# Parse arguments
while (( "$#" )); do
case $1 in
--hadoop)
SPARK_HADOOP_VERSION="$2"
shift
;;
--with-yarn)
SPARK_WITH_YARN=true
;;
--tgz)
MAKE_TGZ=true
;;
esac
shift
done
if [ "$MAKE_TGZ" == "true" ]; then
echo "Making spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz"
else
echo "Making distribution for Spark $VERSION in $DISTDIR..."
fi
echo "Hadoop version set to $SPARK_HADOOP_VERSION"
if [ "$SPARK_WITH_YARN" == "true" ]; then
echo "YARN enabled"
else
echo "YARN disabled"
fi
# Build fat JAR
$FWDIR/sbt/sbt "repl/assembly"
export SPARK_HADOOP_VERSION
export SPARK_WITH_YARN
"$FWDIR/sbt/sbt" "repl/assembly"
# Make directories
rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/jars"
echo "$VERSION" >$DISTDIR/RELEASE
echo "$VERSION" > "$DISTDIR/RELEASE"
# Copy jars
cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
@ -69,9 +100,9 @@ cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
cp "$FWDIR/spark-executor" "$DISTDIR"
if [ "$1" == "tgz" ]; then
if [ "$MAKE_TGZ" == "true" ]; then
TARDIR="$FWDIR/spark-$VERSION"
cp -r $DISTDIR $TARDIR
tar -zcf spark-$VERSION-bin.tar.gz -C $FWDIR spark-$VERSION
rm -rf $TARDIR
cp -r "$DISTDIR" "$TARDIR"
tar -zcf "spark-$VERSION-hadoop_$SPARK_HADOOP_VERSION-bin.tar.gz" -C "$FWDIR" "spark-$VERSION"
rm -rf "$TARDIR"
fi

Просмотреть файл

@ -32,6 +32,11 @@
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -41,7 +46,6 @@
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
@ -68,103 +72,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

224
pom.xml
Просмотреть файл

@ -73,8 +73,9 @@
<mesos.version>0.12.1</mesos.version>
<akka.version>2.0.3</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<cdh.version>4.1.2</cdh.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.2.1</hadoop.version>
<!-- <hadoop.version>2.0.0-mr1-cdh4.1.2</hadoop.version> -->
<PermGen>64m</PermGen>
<MaxPermGen>512m</MaxPermGen>
@ -320,6 +321,54 @@
<version>0.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.7.4</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
@ -525,60 +574,6 @@
</build>
<profiles>
<profile>
<id>hadoop1</id>
<properties>
<hadoop.major.version>1</hadoop.major.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.4</version>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
<profile>
<id>hadoop2</id>
<properties>
<hadoop.major.version>2</hadoop.major.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh${cdh.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.0.0-mr1-cdh${cdh.version}</version>
</dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.7.4</version>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
<profile>
<id>hadoop2-yarn</id>
<properties>
@ -588,6 +583,10 @@
<yarn.version>2.0.5-alpha</yarn.version>
</properties>
<modules>
<module>yarn</module>
</modules>
<repositories>
<repository>
<id>maven-root</id>
@ -609,32 +608,125 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${yarn.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${yarn.version}</version>
</dependency>
<!-- Specify Avro version because Kafka also has it as a dependency -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<version>1.7.4</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-xc</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>

Просмотреть файл

@ -26,28 +26,19 @@ import AssemblyKeys._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
// "1.0.4" for Apache releases, or "0.20.2-cdh3u5" for Cloudera Hadoop.
val HADOOP_VERSION = "1.0.4"
val HADOOP_MAJOR_VERSION = "1"
val HADOOP_YARN = false
// For Hadoop 2 versions such as "2.0.0-mr1-cdh4.1.1", set the HADOOP_MAJOR_VERSION to "2"
//val HADOOP_VERSION = "2.0.0-mr1-cdh4.1.1"
//val HADOOP_MAJOR_VERSION = "2"
//val HADOOP_YARN = false
// For Hadoop 2 YARN support
//val HADOOP_VERSION = "2.0.2-alpha"
//val HADOOP_MAJOR_VERSION = "2"
//val HADOOP_YARN = true
// Note that these variables can be set through the environment variables
// SPARK_HADOOP_VERSION and SPARK_WITH_YARN.
val DEFAULT_HADOOP_VERSION = "1.2.1"
val DEFAULT_WITH_YARN = false
// HBase version; set as appropriate.
val HBASE_VERSION = "0.94.6"
lazy val root = Project("root", file("."), settings = rootSettings) aggregate(core, repl, examples, bagel, streaming, mllib, tools)
lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects:_*)
lazy val core = Project("core", file("core"), settings = coreSettings)
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib)
lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn(core) dependsOn(bagel) dependsOn(mllib) dependsOn(maybeYarn:_*)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming) dependsOn(mllib)
@ -59,10 +50,24 @@ object SparkBuild extends Build {
lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn (core)
lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn (core)
// A configuration to set an alternative publishLocalConfiguration
lazy val MavenCompile = config("m2r") extend(Compile)
lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy")
// Allows build configuration to be set through environment variables
lazy val hadoopVersion = scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
lazy val isYarnMode = scala.util.Properties.envOrNone("SPARK_WITH_YARN") match {
case None => DEFAULT_WITH_YARN
case Some(v) => v.toBoolean
}
// Conditionally include the yarn sub-project
lazy val maybeYarn = if(isYarnMode) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if(isYarnMode) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
lazy val allProjects = Seq[ProjectReference](core, repl, examples, bagel, streaming, mllib, tools) ++ maybeYarnRef
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.8.0-SNAPSHOT",
@ -183,37 +188,15 @@ object SparkBuild extends Build {
"org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.avro" % "avro" % "1.7.4",
"org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
) ++ (
if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) {
Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-api" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-common" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
} else {
Seq(
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
}
} else {
Seq("org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeJackson, excludeNetty) )
}),
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ /
( if (HADOOP_YARN && HADOOP_MAJOR_VERSION == "2") {
"src/hadoop2-yarn/scala"
} else {
"src/hadoop" + HADOOP_MAJOR_VERSION + "/scala"
} )
}
)
) ++ assemblySettings ++ extraAssemblySettings
def rootSettings = sharedSettings ++ Seq(
@ -272,6 +255,17 @@ object SparkBuild extends Build {
)
) ++ assemblySettings ++ extraAssemblySettings
def yarnSettings = sharedSettings ++ Seq(
name := "spark-yarn",
libraryDependencies ++= Seq(
// Exclude rule required for all ?
"org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
)
) ++ assemblySettings ++ extraAssemblySettings
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard

Просмотреть файл

@ -32,11 +32,31 @@
<url>http://spark-project.org/</url>
<properties>
<deb.pkg.name>spark-${classifier}</deb.pkg.name>
<deb.install.path>/usr/share/spark-${classifier}</deb.install.path>
<deb.pkg.name>spark</deb.pkg.name>
<deb.install.path>/usr/share/spark</deb.install.path>
<deb.user>root</deb.user>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
@ -44,7 +64,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar</outputFile>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
@ -85,143 +105,13 @@
</build>
<profiles>
<profile>
<id>hadoop1</id>
<properties>
<classifier>hadoop1</classifier>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop2</id>
<properties>
<classifier>hadoop2</classifier>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>hadoop2-yarn</id>
<properties>
<classifier>hadoop2-yarn</classifier>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-yarn</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
@ -261,7 +151,7 @@
<compression>gzip</compression>
<dataSet>
<data>
<src>${project.build.directory}/${project.artifactId}-${project.version}-shaded-${classifier}.jar</src>
<src>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</src>
<type>file</type>
<mapper>
<type>perm</type>

Просмотреть файл

@ -37,6 +37,17 @@
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -57,7 +68,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
@ -115,181 +125,16 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<properties>
<classifier>hadoop1</classifier>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<properties>
<classifier>hadoop2</classifier>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<properties>
<classifier>hadoop2-yarn</classifier>
</properties>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<artifactId>spark-yarn</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-examples</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

Просмотреть файл

@ -40,6 +40,11 @@
</repositories>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
@ -115,103 +120,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

Просмотреть файл

@ -31,6 +31,16 @@
<url>http://spark-project.org/</url>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.version}</artifactId>
@ -56,121 +66,4 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop1</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop1</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop1</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<version>${project.version}</version>
<classifier>hadoop2-yarn</classifier>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<classifier>hadoop2-yarn</classifier>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

111
yarn/pom.xml Normal file
Просмотреть файл

@ -0,0 +1,111 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
<version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-yarn</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark-project.org/</url>
<build>
<outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<outputFile>${project.build.directory}/${project.artifactId}-${project.version}-shaded.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>hadoop2-yarn</id>
<dependencies>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-ipc</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

Просмотреть файл

@ -124,18 +124,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private def waitForSparkMaster() {
logInfo("Waiting for spark driver to be reachable.")
var driverUp = false
while(!driverUp) {
var tries = 0
while(!driverUp && tries < 10) {
val driverHost = System.getProperty("spark.driver.host")
val driverPort = System.getProperty("spark.driver.port")
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
logInfo("Master now available: " + driverHost + ":" + driverPort)
logInfo("Driver now available: " + driverHost + ":" + driverPort)
driverUp = true
} catch {
case e: Exception =>
logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
Thread.sleep(100)
tries = tries + 1
}
}
}
@ -176,7 +178,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
var sparkContext: SparkContext = null
ApplicationMaster.sparkContextRef.synchronized {
var count = 0
while (ApplicationMaster.sparkContextRef.get() == null) {
while (ApplicationMaster.sparkContextRef.get() == null && count < 10) {
logInfo("Waiting for spark context initialization ... " + count)
count = count + 1
ApplicationMaster.sparkContextRef.wait(10000L)

Просмотреть файл

@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
SparkHadoopUtil.setYarnMode(env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
localResources("spark.jar").getResource().getFile().toString()
@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
object Client {
def main(argStrings: Array[String]) {
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
SparkHadoopUtil.setYarnMode()
new Client(args).run
}

Просмотреть файл

@ -15,8 +15,9 @@
* limitations under the License.
*/
package spark.deploy
package spark.deploy.yarn
import spark.deploy.SparkHadoopUtil
import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
@ -28,48 +29,17 @@ import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
*/
object SparkHadoopUtil {
val yarnConf = newConfiguration()
def getUserNameFromEnvironment(): String = {
// defaulting to env if -D is not present ...
val retval = System.getProperty(Environment.USER.name, System.getenv(Environment.USER.name))
// If nothing found, default to user we are running as
if (retval == null) System.getProperty("user.name") else retval
}
def runAsUser(func: (Product) => Unit, args: Product) {
runAsUser(func, args, getUserNameFromEnvironment())
}
def runAsUser(func: (Product) => Unit, args: Product, user: String) {
func(args)
}
class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
def isYarnMode(): Boolean = {
val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
java.lang.Boolean.valueOf(yarnMode)
}
// Set an env variable indicating we are running in YARN mode.
// Note that anything with SPARK prefix gets propagated to all (remote) processes
def setYarnMode() {
System.setProperty("SPARK_YARN_MODE", "true")
}
def setYarnMode(env: HashMap[String, String]) {
env("SPARK_YARN_MODE") = "true"
}
override def isYarnMode(): Boolean = { true }
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
def addCredentials(conf: JobConf) {
override def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials();
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}