This commit is contained in:
tawan0109 2016-01-08 13:04:12 +08:00
Родитель 3cf413a2d4
Коммит 721a63b74e
14 изменённых файлов: 208 добавлений и 137 удалений

Просмотреть файл

@ -53,9 +53,9 @@ Reference: Spark scalastyle-config.xml (https://github.com/apache/spark/blob/mas
<check level="error" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
<parameters>
<parameter name="header"><![CDATA[/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/]]></parameter>
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/]]></parameter>
</parameters>
</check>

Просмотреть файл

@ -1,5 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.api.csharp

Просмотреть файл

@ -1,5 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.api.csharp
@ -9,9 +11,9 @@ import java.net.Socket
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import org.apache.spark.api.csharp.SerDe._ //TODO - work with SparkR devs to make this configurable and reuse RBackendHandler
// TODO - work with SparkR devs to make this configurable and reuse RBackendHandler
import org.apache.spark.api.csharp.SerDe._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashMap
/**
@ -19,8 +21,8 @@ import scala.collection.mutable.HashMap
* This implementation is identical to RBackendHandler and that can be reused
* in SparkCLR if SerDe is made pluggable
*/
// Since SparkCLR is a package to Spark and not a part of spark-core it mirrors the implementation of
// selected parts from RBackend with SparkCLR customizations
// Since SparkCLR is a package to Spark and not a part of spark-core, it mirrors the implementation
// of selected parts from RBackend with SparkCLR customizations
@Sharable
class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHandler[Array[Byte]] {
@ -60,6 +62,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
val t = readObjectType(dis)
assert(t == 'i')
val port = readInt(dis)
// scalastyle:off println
println("Connecting to a callback server at port " + port)
CSharpBackend.callbackPort = port
writeInt(dos, 0)
@ -67,6 +70,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
case "closeCallback" =>
// Send close to CSharp callback server.
println("Requesting to close all call back sockets.")
// scalastyle:on
var socket: Socket = null
do {
socket = CSharpBackend.callbackSockets.poll()
@ -101,7 +105,9 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
// Close the connection when an exception is raised.
// scalastyle:off println
println("Exception caught: " + cause.getMessage)
// scalastyle:on
cause.printStackTrace()
ctx.close()
}
@ -164,13 +170,14 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
}
} catch {
case e: Exception =>
//TODO - logError does not work now..fix //logError(s"$methodName on $objId failed", e)
// TODO - logError does not work now..fix //logError(s"$methodName on $objId failed", e)
val jvmObj = JVMObjectTracker.get(objId)
val jvmObjName = jvmObj match
{
case Some(jObj) => jObj.getClass.getName
case None => "NullObject"
}
// scalastyle:off println
println(s"$methodName on object of type $jvmObjName failed")
println(e.getMessage)
println(e.printStackTrace())
@ -188,6 +195,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
}
})
}
// scalastyle:on println
writeInt(dos, -1)
writeString(dos, Utils.exceptionString(e.getCause))
}
@ -241,6 +249,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
true
}
// scalastyle:off println
def logError(id: String) {
println(id)
}
@ -248,6 +257,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
def logWarning(id: String) {
println(id)
}
// scalastyle:on println
def logError(id: String, e: Exception): Unit = {
@ -286,4 +296,4 @@ private object JVMObjectTracker {
objMap.remove(id)
}
}
}

Просмотреть файл

@ -1,5 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.api.csharp
@ -22,20 +24,30 @@ import org.apache.spark.util.csharp.{Utils => CSharpUtils}
* it just extends from it without overriding any behavior for now
*/
class CSharpRDD(
@transient parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
cSharpIncludes: JList[String],
preservePartitioning: Boolean,
cSharpWorkerExecutable: String,
unUsedVersionIdentifier: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
extends PythonRDD (parent, command, envVars, cSharpIncludes, preservePartitioning, cSharpWorkerExecutable, unUsedVersionIdentifier, broadcastVars, accumulator) {
@transient parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
cSharpIncludes: JList[String],
preservePartitioning: Boolean,
cSharpWorkerExecutable: String,
unUsedVersionIdentifier: String,
broadcastVars: JList[Broadcast[PythonBroadcast]],
accumulator: Accumulator[JList[Array[Byte]]])
extends PythonRDD (
parent,
command,
envVars,
cSharpIncludes,
preservePartitioning,
cSharpWorkerExecutable,
unUsedVersionIdentifier,
broadcastVars,
accumulator) {
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
unzip(new File(cSharpWorkerExecutable).getAbsoluteFile.getParentFile)
logInfo(s"compute CSharpRDD[${this.id}], stageId: ${context.stageId()}, partitionId: ${context.partitionId()}, split_index: ${split.index}")
logInfo(s"compute CSharpRDD[${this.id}], stageId: ${context.stageId()}" +
s", partitionId: ${context.partitionId()}, split_index: ${split.index}")
// fill rddId, stageId and partitionId info
val bb = ByteBuffer.allocate(12)
@ -52,10 +64,12 @@ class CSharpRDD(
/**
* Uncompress all zip files under directory cSharpWorkerWorkingDir.
* As .zip file is supported to be submitted by sparkclr-submit.cmd, and there might be some runtime dependencies of cSharpWorker.exe in the zip files,
* As .zip file is supported to be submitted by sparkclr-submit.cmd, and there might be
* some runtime dependencies of cSharpWorker.exe in the zip files,
* so before start to execute cSharpWorker.exe, uncompress all zip files first.
*
* One executor might process multiple splits, if zip files have already been unzipped in the previous split, there is no need to unzip them again.
* One executor might process multiple splits, if zip files have already been unzipped
* in the previous split, there is no need to unzip them again.
* Once uncompression is done, a flag file "doneFlag" will be created.
* @param cSharpWorkerWorkingDir directory where cSharpWorker.exe is located
*/
@ -84,7 +98,8 @@ class CSharpRDD(
val unzippingFlag = new File(cSharpWorkerWorkingDir, unzippingFlagName)
// if another thread is uncompressing files, current thread just needs to wait the operation done and return
// if another thread is uncompressing files,
// current thread just needs to wait the operation done and return
if (unzippingFlag.exists()) {
waitUnzipOperationDone(doneFlag)
return
@ -115,7 +130,7 @@ class CSharpRDD(
// so if obtain the lock successfully, there is no chance that the unzippingFlag still exists
unzippingFlag.createNewFile()
//unzip file
// unzip file
for (zipFile <- files) {
CSharpUtils.unzip(new File(cSharpWorkerWorkingDir, zipFile), cSharpWorkerWorkingDir)
logInfo("Unzip file: " + zipFile)
@ -165,7 +180,10 @@ class CSharpRDD(
}
object CSharpRDD {
def createRDDFromArray(sc: SparkContext, arr: Array[Array[Byte]], numSlices: Int): JavaRDD[Array[Byte]] = {
def createRDDFromArray(
sc: SparkContext,
arr: Array[Array[Byte]],
numSlices: Int): JavaRDD[Array[Byte]] = {
JavaRDD.fromRDD(sc.parallelize(arr, numSlices))
}
}

Просмотреть файл

@ -1,19 +1,20 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.api.csharp
import java.io.{DataOutputStream, DataInputStream}
import java.sql.{Time, Timestamp, Date}
import scala.collection.JavaConversions._
import scala.collection.JavaConversions._
/**
* Functions to serialize and deserialize between CLR & JVM.
* This implementation of methods is mostly identical to the SerDe implementation in R.
*/
//TODO look into the possibility of reusing SerDe from R implementation
// TODO look into the possibility of reusing SerDe from R implementation
object SerDe {
def readObjectType(dis: DataInputStream): Char = {
dis.readByte().toChar
@ -66,8 +67,7 @@ object SerDe {
def readStringBytes(in: DataInputStream, len: Int): String = {
val bytes = new Array[Byte](len)
in.readFully(bytes)
//assert(bytes(len - 1) == 0)
val str = new String(bytes/*.dropRight(1)*/, "UTF-8")
val str = new String(bytes, "UTF-8")
str
}
@ -77,8 +77,6 @@ object SerDe {
}
def readBoolean(in: DataInputStream): Boolean = {
//val intVal = in.readInt()
//if (intVal == 0) false else true
return in.readBoolean()
}
@ -156,7 +154,7 @@ object SerDe {
}
}
//Using the same mapping as SparkR implementation for now
// Using the same mapping as SparkR implementation for now
// Methods to write out data from Java to C#
//
// Type mapping from Java to C#
@ -274,8 +272,6 @@ object SerDe {
}
def writeBoolean(out: DataOutputStream, value: Boolean): Unit = {
//val intValue = if (value) 1 else 0
//out.writeInt(intValue)
out.writeBoolean(value)
}
@ -293,10 +289,6 @@ object SerDe {
// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {
/*val len = value.length
out.writeInt(len + 1) // For the \0
out.writeBytes(value)
out.writeByte(0)*/
val len = value.length
out.writeInt(len)
out.writeBytes(value)

Просмотреть файл

@ -1,5 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.deploy.csharp
@ -15,14 +17,15 @@ import org.apache.spark.util.{Utils, RedirectThread}
import org.apache.spark.util.csharp.{Utils => CSharpSparkUtils}
/**
* Launched by sparkclr-submit.cmd. It launches CSharpBackend, gets its port number and launches C# process
* passing the port number to it.
* The runner implementation is mostly identical to RRunner with SparkCLR-specific customizations
* Launched by sparkclr-submit.cmd. It launches CSharpBackend,
* gets its port number and launches C# process passing the port number to it.
* The runner implementation is mostly identical to RRunner with SparkCLR-specific customizations.
*/
// scalastyle:off println
object CSharpRunner {
def main(args: Array[String]): Unit = {
//determines if CSharpBackend need to be run in debug mode
//in debug mode this runner will not launch C# process
// determines if CSharpBackend need to be run in debug mode
// in debug mode this runner will not launch C# process
var runInDebugMode = false
if (args.length == 0) {
@ -31,14 +34,15 @@ object CSharpRunner {
if (args.length == 1 && args(0).equalsIgnoreCase("debug")) {
runInDebugMode = true
println("[CSharpRunner.main] Debug mode is set. CSharp executable will not be launched as a sub-process.")
println("[CSharpRunner.main] Debug mode is set. " +
"CSharp executable will not be launched as a sub-process.")
}
var csharpExecutable = ""
var otherArgs: Array[String] = null
if (!runInDebugMode) {
if(args(0).toLowerCase.endsWith(".zip")) {
if (args(0).toLowerCase.endsWith(".zip")) {
var zipFileName = args(0)
val driverDir = new File("").getAbsoluteFile
@ -49,11 +53,14 @@ object CSharpRunner {
println(s"[CSharpRunner.main] Unzipping driver $zipFileName in $driverDir")
CSharpSparkUtils.unzip(new File(zipFileName), driverDir)
csharpExecutable = PythonRunner.formatPath(args(1)) //reusing windows-specific formatting in PythonRunner
// reusing windows-specific formatting in PythonRunner
csharpExecutable = PythonRunner.formatPath(args(1))
otherArgs = args.slice(2, args.length)
} else if(new File(args(0)).isDirectory) {
// In local mode, there will no zip file generated if given a directory, skip uncompression in this case
csharpExecutable = PythonRunner.formatPath(args(1)) //reusing windows-specific formatting in PythonRunner
} else if (new File(args(0)).isDirectory) {
// In local mode, there will no zip file generated if given a directory,
// skip uncompression in this case
// reusing windows-specific formatting in PythonRunner
csharpExecutable = PythonRunner.formatPath(args(1))
otherArgs = args.slice(2, args.length)
} else {
csharpExecutable = PythonRunner.formatPath(args(0))
@ -80,7 +87,8 @@ object CSharpRunner {
val csharpBackendThread = new Thread("CSharpBackend") {
override def run() {
csharpBackendPortNumber = csharpBackend.init()
println("[CSharpRunner.main] Port number used by CSharpBackend is " + csharpBackendPortNumber) //TODO - send to logger also
println("[CSharpRunner.main] Port number used by CSharpBackend is "
+ csharpBackendPortNumber) // TODO - send to logger also
initialized.release()
csharpBackend.run()
}
@ -98,7 +106,8 @@ object CSharpRunner {
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
env.put(key, value)
println("[CSharpRunner.main] adding key=" + key + " and value=" + value + " to environment")
println("[CSharpRunner.main] adding key=" + key
+ " and value=" + value + " to environment")
}
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
@ -110,7 +119,8 @@ object CSharpRunner {
returnCode = process.waitFor()
closeBackend(csharpBackend)
} catch {
case t: Throwable => println("[CSharpRunner.main]" + t.getMessage + "\n" + t.getStackTrace)
case t: Throwable =>
println("[CSharpRunner.main]" + t.getMessage + "\n" + t.getStackTrace)
}
println("[CSharpRunner.main] Return CSharpBackend code " + returnCode)
@ -124,9 +134,8 @@ object CSharpRunner {
CSharpSparkUtils.exit(0)
}
} else {
// scalastyle:off println
println("[CSharpRunner.main] CSharpBackend did not initialize in " + backendTimeout + " seconds")
// scalastyle:on println
println("[CSharpRunner.main] CSharpBackend did not initialize in "
+ backendTimeout + " seconds")
CSharpSparkUtils.exit(-1)
}
}
@ -167,3 +176,4 @@ object CSharpRunner {
csharpBackend.close()
}
}
// scalastyle:on println

Просмотреть файл

@ -1,5 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.launcher
@ -16,6 +18,7 @@ import scala.util.control.Breaks._
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
// scalastyle:off println
object SparkCLRSubmitArguments {
val csharpRunnerClass: String = "org.apache.spark.deploy.csharp.CSharpRunner"
@ -31,10 +34,16 @@ object SparkCLRSubmitArguments {
/**
* Parses and encapsulates arguments from the SparkCLR-submit script.
*
* Current implementation needs to access "opts" attributes from SparkSubmitOptionParser, and the "opts" can only be accessed in the same package.
* Current implementation needs to access "opts" attributes from SparkSubmitOptionParser,
* and the "opts" can only be accessed in the same package.
* This is the reason why this class is put into current package.
*/
class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitFn: Int => Unit, printStream: PrintStream) extends SparkSubmitArgumentsParser {
class SparkCLRSubmitArguments(
args: Seq[String],
env: Map[String, String],
exitFn: Int => Unit,
printStream: PrintStream)
extends SparkSubmitArgumentsParser {
import SparkCLRSubmitArguments._
@ -64,7 +73,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var sparkCLRJarPath: String = new File(CSharpRunner.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getPath
var sparkCLRJarPath: String =
new File(CSharpRunner.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getPath
var remoteSparkCLRJarPath: String = null
@ -149,8 +159,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
}
/**
* As "opts" is a final variable, we can't append a new element to it, or assign a new array to it.
* As a workaround, we build a new option array `options` for parsing.
* As "opts" is a final variable, we can't append a new element to it, or assign a new array to
* it. As a workaround, we build a new option array `options` for parsing.
*/
private def updateOpts(): Unit = {
// Compared to the original options, we remove `--class` option, add two new options `--exe`
@ -264,7 +274,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
* treated as application arguments.
*/
override protected def handleUnknown(opt: String): Boolean = {
// need to give user hints that "--class" option is not supported in csharpspark-submit.cmd, use --main-executable instead.
// need to give user hints that "--class" option is not supported in csharpspark-submit.cmd,
// use --main-executable instead.
if (opt == CLASS) {
SparkSubmit.printErrorAndExit(s"Option '$CLASS' is not supported in SparkCLR submission.")
@ -284,9 +295,10 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
}
private def inferSubmitArguments(): Unit = {
//figure out deploy mode
// figure out deploy mode
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
master = Option(master).orElse(sparkProperties.get("spark.master")).orElse(env.get("MASTER")).orNull
master = Option(master).orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER")).orNull
master match {
case "yarn-cluster" => deployMode = "cluster"
case "yarn-client" => deployMode = "client"
@ -303,7 +315,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
}
if (primaryResource == null) {
printErrorAndExit("No primary resource found; Please specify one with a zip file or a directory)")
printErrorAndExit("No primary resource found; " +
"Please specify one with a zip file or a directory)")
}
if (mainExecutable == null || !mainExecutable.toLowerCase().endsWith(".exe")) {
@ -312,7 +325,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
if (deployMode == "cluster" && master.startsWith("spark://")) {
if (remoteSparkCLRJarPath == null) {
printErrorAndExit(s"No remote sparkclr jar found; please specify one with option $REMOTE_SPARKCLR_JAR_PATH")
printErrorAndExit(s"No remote sparkclr jar found; " +
s"please specify one with option $REMOTE_SPARKCLR_JAR_PATH")
}
if (!remoteSparkCLRJarPath.toLowerCase.startsWith("hdfs://")) {
@ -385,7 +399,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
}
case "cluster" => {
cmd += (s" --class $csharpRunnerClass $sparkCLRJarPath " + zippedPrimaryResource.getName)
cmd += (s" --class $csharpRunnerClass $sparkCLRJarPath "
+ zippedPrimaryResource.getName)
}
case _ =>
@ -413,7 +428,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
case pr if pr.endsWith(".zip") => {
deployMode match {
case "cluster" =>
case _ => mainExecutable = new File(new File(primaryResource).getAbsoluteFile.getParent, mainExecutable).getPath
case _ => mainExecutable =
new File(new File(primaryResource).getAbsoluteFile.getParent, mainExecutable).getPath
}
}
@ -422,8 +438,9 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
}
/**
* In order not to miss any driver dependencies, all files under user driver directory (SparkCLR DLLs and CSharpWorker.exe should also be included)
* will assembled into a zip file and shipped by --file parameter
* In order not to miss any driver dependencies, all files under user driver directory (SparkCLR
* DLLs and CSharpWorker.exe should also be included) will assembled into a zip file and shipped
* by --file parameter
* @return
*/
private def zipPrimaryResource(): File = {
@ -433,12 +450,14 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
primaryResource match {
case pr if new File(pr).isDirectory => {
zippedResource = new File(System.getProperty("java.io.tmpdir"), new File(primaryResource).getName + "_" + System.currentTimeMillis() + ".zip")
zippedResource = new File(System.getProperty("java.io.tmpdir"),
new File(primaryResource).getName + "_" + System.currentTimeMillis() + ".zip")
CSharpUtils.zip(new File(primaryResource), zippedResource)
}
case pr if pr.endsWith(".exe") => {
zippedResource = new File(System.getProperty("java.io.tmpdir"), System.currentTimeMillis() + ".zip")
zippedResource = new File(System.getProperty("java.io.tmpdir"),
System.currentTimeMillis() + ".zip")
CSharpUtils.zip(new File(primaryResource).getParentFile, zippedResource)
}
@ -448,7 +467,8 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
}
if (zippedResource != null && !primaryResource.endsWith(".zip")) {
SparkSubmit.printStream.println("Zip driver directory " + new File(primaryResource).getAbsolutePath + " to " + zippedResource.getPath)
SparkSubmit.printStream.println("Zip driver directory "
+ new File(primaryResource).getAbsolutePath + " to " + zippedResource.getPath)
}
zippedResource
@ -564,3 +584,4 @@ class SparkCLRSubmitArguments(args: Seq[String], env: Map[String, String], exitF
exitFn(exitCode)
}
}
// scalastyle:on println

Просмотреть файл

@ -1,5 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.sql.api.csharp
@ -142,7 +144,11 @@ object SQLUtils {
sqlContext.read.format(source).schema(schema).load()
}
def loadTextFile(sqlContext: SQLContext, path: String, hasHeader: java.lang.Boolean, inferSchema: java.lang.Boolean) : DataFrame = {
def loadTextFile(
sqlContext: SQLContext,
path: String,
hasHeader: java.lang.Boolean,
inferSchema: java.lang.Boolean) : DataFrame = {
var dfReader = sqlContext.read.format("com.databricks.spark.csv")
if (hasHeader)
{
@ -155,7 +161,11 @@ object SQLUtils {
dfReader.load(path)
}
def loadTextFile(sqlContext: SQLContext, path: String, delimiter: String, schemaJson: String) : DataFrame = {
def loadTextFile(
sqlContext: SQLContext,
path: String,
delimiter: String,
schemaJson: String) : DataFrame = {
val stringRdd = sqlContext.sparkContext.textFile(path)
val schema = createSchema(schemaJson)
@ -164,36 +174,38 @@ object SQLUtils {
val columns = s.split(delimiter)
columns.length match {
case 1 => RowFactory.create(columns(0))
case 2 => RowFactory.create(columns(0),columns(1))
case 3 => RowFactory.create(columns(0),columns(1),columns(2))
case 4 => RowFactory.create(columns(0),columns(1),columns(2),columns(3))
case 5 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4))
case 6 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5))
case 7 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6))
case 8 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7))
case 9 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8))
case 10 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9))
case 11 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10))
case 12 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11))
case 13 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12))
case 14 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13))
case 15 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14))
case 16 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15))
case 17 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16))
case 18 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17))
case 19 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18))
case 20 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19))
case 21 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20))
case 22 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21))
case 23 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22))
case 24 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23))
case 25 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23),columns(24))
case 26 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23),columns(24),columns(25))
case 27 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23),columns(24),columns(25),columns(26))
case 28 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23),columns(24),columns(25),columns(26),columns(27))
case 29 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23),columns(24),columns(25),columns(26),columns(27),columns(28))
case 30 => RowFactory.create(columns(0),columns(1),columns(2),columns(3),columns(4),columns(5),columns(6),columns(7),columns(8),columns(9),columns(10),columns(11),columns(12),columns(13),columns(14),columns(15),columns(16),columns(17),columns(18),columns(19),columns(20),columns(21),columns(22),columns(23),columns(24),columns(25),columns(26),columns(27),columns(28),columns(29))
case 2 => RowFactory.create(columns(0), columns(1))
case 3 => RowFactory.create(columns(0), columns(1), columns(2))
case 4 => RowFactory.create(columns(0), columns(1), columns(2), columns(3))
case 5 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4))
// scalastyle:off
case 6 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5))
case 7 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6))
case 8 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7))
case 9 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8))
case 10 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9))
case 11 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10))
case 12 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11))
case 13 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12))
case 14 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13))
case 15 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14))
case 16 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15))
case 17 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16))
case 18 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17))
case 19 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18))
case 20 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19))
case 21 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20))
case 22 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21))
case 23 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22))
case 24 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23))
case 25 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23), columns(24))
case 26 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23), columns(24), columns(25))
case 27 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23), columns(24), columns(25), columns(26))
case 28 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23), columns(24), columns(25), columns(26), columns(27))
case 29 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23), columns(24), columns(25), columns(26), columns(27), columns(28))
case 30 => RowFactory.create(columns(0), columns(1), columns(2), columns(3), columns(4), columns(5), columns(6), columns(7), columns(8), columns(9), columns(10), columns(11), columns(12), columns(13), columns(14), columns(15), columns(16), columns(17), columns(18), columns(19), columns(20), columns(21), columns(22), columns(23), columns(24), columns(25), columns(26), columns(27), columns(28), columns(29))
case _ => throw new Exception("Text files with more than 30 columns currently not supported") //TODO - if requirement comes up, generate code for additional columns
// scalastyle:on
}
}

Просмотреть файл

@ -20,11 +20,10 @@ package org.apache.spark.streaming.api.csharp
import org.apache.spark.api.csharp._
import org.apache.spark.api.csharp.SerDe._
import java.io.ByteArrayOutputStream
import java.io.DataInputStream
import java.io.DataOutputStream
import java.net.Socket
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.{ArrayList => JArrayList}
import scala.collection.JavaConversions._
import scala.language.existentials
@ -78,7 +77,7 @@ object CSharpDStream {
// log exception only when callback socket is not shutdown explicitly
if (!CSharpBackend.callbackSocketShutdown) {
// TODO: change println to log
System.err.println("CSharp transform callback failed with " + e)
System.err.println("CSharp transform callback failed with " + e) // scalastyle:off println
e.printStackTrace()
}

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.util.csharp
import java.io._
@ -85,7 +86,7 @@ object Utils {
* @param targetDir target directory
*/
def unzip(file: File, targetDir: File): Unit = {
if(!targetDir.exists()){
if (!targetDir.exists()){
targetDir.mkdir()
}
@ -121,7 +122,9 @@ object Utils {
*/
def exit(status: Int, maxDelayMillis: Long) {
try {
// scalastyle:off println
println(s"Utils.exit() with status: $status, maxDelayMillis: $maxDelayMillis")
// scalastyle:on println
// setup a timer, so if nice exit fails, the nasty exit happens
val timer = new Timer()
@ -133,7 +136,7 @@ object Utils {
}, maxDelayMillis)
// try to exit nicely
System.exit(status);
} catch {
} catch {
// exit nastily if we have a problem
case ex: Throwable => Runtime.getRuntime.halt(status)
} finally {

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.api.csharp
import org.apache.spark.csharp.SparkCLRFunSuite

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.csharp
import org.apache.spark.Logging

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.launcher
import java.io.{File, OutputStream, PrintStream}

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
/*
* Copyright (c) Microsoft. All rights reserved.
* Licensed under the MIT license. See LICENSE file in the project root for full license information.
*/
package org.apache.spark.util.csharp
import java.io._