Use logInfo and logError method instead of println (#556)

* Use logInfo and logError method insted of println

Finish TODO "logError does not work now"

* remove redundant printStackTrace method
This commit is contained in:
Yun Tang 2016-09-10 05:20:34 +08:00 коммит произвёл Hebin Huang
Родитель da54d509ed
Коммит 9a37ec84fd
5 изменённых файлов: 40 добавлений и 60 удалений

Просмотреть файл

@ -7,15 +7,16 @@ package org.apache.spark.api.csharp
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket}
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue, TimeUnit}
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.{ChannelInitializer, EventLoopGroup, ChannelFuture}
import io.netty.channel.{ChannelFuture, ChannelInitializer, EventLoopGroup}
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
import org.apache.spark.internal.Logging
/**
@ -24,9 +25,10 @@ import io.netty.handler.codec.bytes.{ByteArrayDecoder, ByteArrayEncoder}
* This implementation is identical to RBackend and that can be reused
* in SparkCLR if the handler is made pluggable
*/
// Since SparkCLR is a package to Spark and not a part of spark-core it mirrors the implementation of
// selected parts from RBackend with SparkCLR customizations
class CSharpBackend { self => // for accessing the this reference in inner class(ChannelInitializer)
// Since SparkCLR is a package to Spark and not a part of spark-core it mirrors the implementation
// of selected parts from RBackend with SparkCLR customizations
class CSharpBackend extends Logging
{ self => // for accessing the this reference in inner class(ChannelInitializer)
private[this] var channelFuture: ChannelFuture = null
private[this] var bootstrap: ServerBootstrap = null
private[this] var bossGroup: EventLoopGroup = null
@ -82,7 +84,7 @@ class CSharpBackend { self => // for accessing the this reference in inner class
bootstrap = null
// Send close to CSharp callback server.
println("Requesting to close all call back sockets.")
logInfo("Requesting to close all call back sockets.")
var socket: Socket = null
do {
socket = CSharpBackend.callbackSockets.poll()
@ -94,7 +96,7 @@ class CSharpBackend { self => // for accessing the this reference in inner class
socket = null
}
catch {
case e : Exception => println("Exception when closing socket: " + e)
case e : Exception => logError("Exception when closing socket: ", e)
}
}
} while (socket != null)

Просмотреть файл

@ -6,11 +6,12 @@
package org.apache.spark.api.csharp
import org.apache.spark.util.Utils
import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream, ByteArrayInputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.net.Socket
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import org.apache.spark.internal.Logging
// TODO - work with SparkR devs to make this configurable and reuse RBackendHandler
import org.apache.spark.api.csharp.SerDe._
@ -24,7 +25,8 @@ import scala.collection.mutable.HashMap
*/
// Since SparkCLR is a package to Spark and not a part of spark-core, it mirrors the implementation
// of selected parts from RBackend with SparkCLR customizations
class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHandler[Array[Byte]] {
class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHandler[Array[Byte]]
with Logging{
override def channelRead0(ctx: ChannelHandlerContext, msg: Array[Byte]): Unit = {
val reply = handleBackendRequest(msg)
@ -71,15 +73,13 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
val t = readObjectType(dis)
assert(t == 'i')
val port = readInt(dis)
// scalastyle:off println
println("[CSharpBackendHandler] Connecting to a callback server at port " + port)
logInfo(s"Connecting to a callback server at port $port")
CSharpBackend.callbackPort = port
writeInt(dos, 0)
writeType(dos, "void")
case "closeCallback" =>
// Send close to CSharp callback server.
println("[CSharpBackendHandler] Requesting to close all call back sockets.")
// scalastyle:on
logInfo("Requesting to close all call back sockets.")
var socket: Socket = null
do {
socket = CSharpBackend.callbackSockets.poll()
@ -91,7 +91,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
socket = null
}
catch {
case e: Exception => println("Exception when closing socket: " + e)
case e: Exception => logError("Exception when closing socket: ", e)
}
}
} while (socket != null)
@ -111,10 +111,7 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
// Close the connection when an exception is raised.
// scalastyle:off println
println("Exception caught: " + cause.getMessage)
// scalastyle:on
cause.printStackTrace()
logError("Exception caught: ", cause)
ctx.close()
}
@ -176,31 +173,26 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
}
} catch {
case e: Exception =>
// TODO - logError does not work now..fix //logError(s"$methodName on $objId failed", e)
val jvmObj = JVMObjectTracker.get(objId)
val jvmObjName = jvmObj match {
case Some(jObj) => jObj.getClass.getName
case None => "NullObject"
}
// scalastyle:off println
println(s"[CSharpBackendHandler] $methodName on object of type $jvmObjName failed")
println(e.getMessage)
println(e.printStackTrace())
logError(s"On object of type $jvmObjName failed", e)
if (methods != null) {
println("methods:")
methods.foreach(println(_))
logError("methods:")
methods.foreach(m => logError(m.toString))
}
if (args != null) {
println("args:")
logError("args:")
args.foreach(arg => {
if (arg != null) {
println("argType: " + arg.getClass.getCanonicalName + ", argValue: " + arg)
logError(s"argType: ${arg.getClass.getCanonicalName}, argValue: $arg")
} else {
println("arg: NULL")
logError("arg: NULL")
}
})
}
// scalastyle:on println
writeInt(dos, -1)
writeString(dos, Utils.exceptionString(e.getCause))
}
@ -254,16 +246,6 @@ class CSharpBackendHandler(server: CSharpBackend) extends SimpleChannelInboundHa
true
}
// scalastyle:off println
def logError(id: String) {
println(id)
}
def logWarning(id: String) {
println(id)
}
// scalastyle:on println
def logError(id: String, e: Exception): Unit = {

Просмотреть файл

@ -181,7 +181,7 @@ class CSharpRDD(
case e: OverlappingFileLockException =>
logInfo("Already obtained the lock.")
waitUnzipOperationDone(doneFlag)
case e: Exception => e.printStackTrace()
case e: Exception => logError("Exception when unzipping cSharpWorkerWorkingDir", e)
}
finally {
if (lock != null && lock.isValid) lock.release()

Просмотреть файл

@ -14,6 +14,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.spark.api.csharp.CSharpBackend
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil, SparkSubmitArguments}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{RedirectThread, Utils}
import org.apache.spark.util.csharp.{Utils => CSharpSparkUtils}
@ -22,8 +23,7 @@ import org.apache.spark.util.csharp.{Utils => CSharpSparkUtils}
* gets its port number and launches C# process passing the port number to it.
* The runner implementation is mostly identical to RRunner with SparkCLR-specific customizations.
*/
// scalastyle:off println
object CSharpRunner {
object CSharpRunner extends Logging{
val MOBIUS_DEBUG_PORT = 5567
def main(args: Array[String]): Unit = {
@ -51,7 +51,7 @@ object CSharpRunner {
zipFileName = downloadDriverFile(zipFileName, driverDir.getAbsolutePath).getName
}
println(s"[CSharpRunner.main] Unzipping driver $zipFileName in $driverDir")
logInfo(s"Unzipping driver $zipFileName in $driverDir")
CSharpSparkUtils.unzip(new File(zipFileName), driverDir)
// reusing windows-specific formatting in PythonRunner
csharpExecutable = PythonRunner.formatPath(args(1))
@ -74,7 +74,7 @@ object CSharpRunner {
processParameters.add(formatPath(csharpExecutable))
otherArgs.foreach( arg => processParameters.add(arg) )
println("[CSharpRunner.main] Starting CSharpBackend!")
logInfo("Starting CSharpBackend!")
// Time to wait for CSharpBackend to initialize in seconds
val backendTimeout = sys.env.getOrElse("CSHARPBACKEND_TIMEOUT", "120").toInt
@ -88,8 +88,7 @@ object CSharpRunner {
// need to get back csharpBackendPortNumber because if the value passed to init is 0
// the port number is dynamically assigned in the backend
csharpBackendPortNumber = csharpBackend.init(csharpBackendPortNumber)
println("[CSharpRunner.main] Port number used by CSharpBackend is "
+ csharpBackendPortNumber) // TODO - send to logger also
logInfo(s"Port number used by CSharpBackend is $csharpBackendPortNumber")
initialized.release()
csharpBackend.run()
}
@ -107,8 +106,7 @@ object CSharpRunner {
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
env.put(key, value)
println("[CSharpRunner.main] adding key=" + key
+ " and value=" + value + " to environment")
logInfo(s"Adding key=$key and value=$value to environment")
}
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
@ -123,22 +121,23 @@ object CSharpRunner {
closeBackend(csharpBackend)
} catch {
case t: Throwable =>
println("[CSharpRunner.main]" + t.getMessage + "\n" + t.getStackTrace)
logError(s"${t.getMessage} \n ${t.getStackTrace}")
}
println("[CSharpRunner.main] Return CSharpBackend code " + returnCode)
logInfo(s"Return CSharpBackend code $returnCode")
CSharpSparkUtils.exit(returnCode)
} else {
// scalastyle:off println
println("***********************************************************************")
println("* [CSharpRunner.main] Backend running debug mode. Press enter to exit *")
println("***********************************************************************")
// scalastyle:on println
Console.readLine()
closeBackend(csharpBackend)
CSharpSparkUtils.exit(0)
}
} else {
println("[CSharpRunner.main] CSharpBackend did not initialize in "
+ backendTimeout + " seconds")
logError(s"CSharpBackend did not initialize in $backendTimeout seconds")
CSharpSparkUtils.exit(-1)
}
}
@ -168,7 +167,7 @@ object CSharpRunner {
val localFile = new File(driverDir, jarFileName)
if (!localFile.exists()) { // May already exist if running multiple workers on one node
println(s"Copying user file $filePath to $driverDir")
logInfo(s"Copying user file $filePath to $driverDir")
Utils.fetchFile(
hdfsFilePath,
new File(driverDir),
@ -187,7 +186,7 @@ object CSharpRunner {
}
def closeBackend(csharpBackend: CSharpBackend): Unit = {
println("[CSharpRunner.main] closing CSharpBackend")
logInfo("Closing CSharpBackend")
csharpBackend.close()
}
@ -205,4 +204,3 @@ object CSharpRunner {
(runInDebugMode, portNumber)
}
}
// scalastyle:on println

Просмотреть файл

@ -29,7 +29,7 @@ import org.apache.spark.streaming.api.java._
import scala.language.existentials
object CSharpDStream {
object CSharpDStream extends Logging{
// Variables for debugging
var debugMode = false
@ -78,9 +78,7 @@ object CSharpDStream {
case e: Exception =>
// log exception only when callback socket is not shutdown explicitly
if (!CSharpBackend.callbackSocketShutdown) {
// TODO: change println to log
System.err.println("CSharp transform callback failed with " + e) // scalastyle:off println
e.printStackTrace()
logError(s"CSharp transform callback failed", e)
}
// close this socket if error happen
@ -89,7 +87,7 @@ object CSharpDStream {
socket.close()
}
catch {
case e: Exception => println("Exception when closing socket: " + e)
case e: Exception => logError("Exception when closing socket", e)
}
}