Capture dotnet application error stack trace (#1047)

This commit is contained in:
kalamut 2022-04-21 18:57:54 -04:00 коммит произвёл GitHub
Родитель b2fa3508d7
Коммит 0f5159069b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 252 добавлений и 28 удалений

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package org.apache.spark.deploy.dotnet
import org.apache.spark.SparkException
/**
* This exception type describes an exception thrown by a .NET user application.
*
* @param exitCode Exit code returned by the .NET application.
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
*/
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
extends SparkException(
dotNetStackTrace match {
case None => s"User application exited with $exitCode"
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
})

Просмотреть файл

@ -14,14 +14,18 @@ import java.util.Locale
import java.util.concurrent.{Semaphore, TimeUnit}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.hadoop.fs.Path
import org.apache.spark
import org.apache.spark.api.dotnet.DotnetBackend
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
import org.apache.spark.internal.config.dotnet.Dotnet.{
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
}
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
import org.apache.spark.util.{RedirectThread, Utils}
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
import scala.collection.JavaConverters._
@ -123,6 +127,17 @@ object DotnetRunner extends Logging {
if (!runInDebugMode) {
var returnCode = -1
var process: Process = null
val enableLogRedirection: Boolean = sys.props
.getOrElse(
ERROR_REDIRECITON_ENABLED.key,
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
case true => new CircularBuffer(
sys.props.getOrElse(
ERROR_BUFFER_SIZE.key,
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
}
try {
val builder = new ProcessBuilder(processParameters)
val env = builder.environment()
@ -137,9 +152,15 @@ object DotnetRunner extends Logging {
// Redirect stdin of JVM process to stdin of .NET process.
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
// Redirect stdout and stderr of .NET process.
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
// Redirect stdout and stderr of .NET process to System.out and to buffer
// if log direction is enabled. If not, redirect only to System.out.
new RedirectThread(
process.getInputStream,
stderrBuffer match {
case Some(buffer) => new TeeOutputStream(System.out, buffer)
case _ => System.out
},
"redirect .NET stdout and stderr").start()
process.waitFor()
} catch {
@ -149,9 +170,12 @@ object DotnetRunner extends Logging {
returnCode = closeDotnetProcess(process)
closeBackend(dotnetBackend)
}
if (returnCode != 0) {
throw new SparkUserAppException(returnCode)
if (stderrBuffer.isDefined) {
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
} else {
throw new SparkUserAppException(returnCode)
}
} else {
logInfo(s".NET application exited successfully")
}

Просмотреть файл

@ -15,4 +15,14 @@ private[spark] object Dotnet {
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
.createWithDefault(false)
val ERROR_REDIRECITON_ENABLED =
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
.createWithDefault(false)
val ERROR_BUFFER_SIZE =
ConfigBuilder("spark.nonjvm.error.buffer.size")
.intConf
.checkValue(_ >= 0, "The error buffer size must not be negative")
.createWithDefault(10240)
}

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package org.apache.spark.deploy.dotnet
import org.apache.spark.SparkException
/**
* This exception type describes an exception thrown by a .NET user application.
*
* @param exitCode Exit code returned by the .NET application.
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
*/
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
extends SparkException(
dotNetStackTrace match {
case None => s"User application exited with $exitCode"
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
})

Просмотреть файл

@ -14,14 +14,18 @@ import java.util.Locale
import java.util.concurrent.{Semaphore, TimeUnit}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.hadoop.fs.Path
import org.apache.spark
import org.apache.spark.api.dotnet.DotnetBackend
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
import org.apache.spark.internal.config.dotnet.Dotnet.{
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
}
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
import org.apache.spark.util.{RedirectThread, Utils}
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
import scala.collection.JavaConverters._
@ -122,6 +126,17 @@ object DotnetRunner extends Logging {
if (!runInDebugMode) {
var returnCode = -1
var process: Process = null
val enableLogRedirection: Boolean = sys.props
.getOrElse(
ERROR_REDIRECITON_ENABLED.key,
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
case true => new CircularBuffer(
sys.props.getOrElse(
ERROR_BUFFER_SIZE.key,
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
}
try {
val builder = new ProcessBuilder(processParameters)
val env = builder.environment()
@ -136,9 +151,15 @@ object DotnetRunner extends Logging {
// Redirect stdin of JVM process to stdin of .NET process.
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
// Redirect stdout and stderr of .NET process.
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
// Redirect stdout and stderr of .NET process to System.out and to buffer
// if log direction is enabled. If not, redirect only to System.out.
new RedirectThread(
process.getInputStream,
stderrBuffer match {
case Some(buffer) => new TeeOutputStream(System.out, buffer)
case _ => System.out
},
"redirect .NET stdout and stderr").start()
process.waitFor()
} catch {
@ -148,9 +169,12 @@ object DotnetRunner extends Logging {
returnCode = closeDotnetProcess(process)
closeBackend(dotnetBackend)
}
if (returnCode != 0) {
throw new SparkUserAppException(returnCode)
if (stderrBuffer.isDefined) {
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
} else {
throw new SparkUserAppException(returnCode)
}
} else {
logInfo(s".NET application exited successfully")
}

Просмотреть файл

@ -15,4 +15,14 @@ private[spark] object Dotnet {
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
.createWithDefault(false)
val ERROR_REDIRECITON_ENABLED =
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
.createWithDefault(false)
val ERROR_BUFFER_SIZE =
ConfigBuilder("spark.nonjvm.error.buffer.size")
.intConf
.checkValue(_ >= 0, "The error buffer size must not be negative")
.createWithDefault(10240)
}

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package org.apache.spark.deploy.dotnet
import org.apache.spark.SparkException
/**
* This exception type describes an exception thrown by a .NET user application.
*
* @param exitCode Exit code returned by the .NET application.
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
*/
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
extends SparkException(
dotNetStackTrace match {
case None => s"User application exited with $exitCode"
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
})

Просмотреть файл

@ -14,14 +14,18 @@ import java.util.Locale
import java.util.concurrent.{Semaphore, TimeUnit}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.hadoop.fs.Path
import org.apache.spark
import org.apache.spark.api.dotnet.DotnetBackend
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
import org.apache.spark.internal.config.dotnet.Dotnet.{
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
}
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
import org.apache.spark.util.{RedirectThread, Utils}
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
import scala.collection.JavaConverters._
@ -122,6 +126,17 @@ object DotnetRunner extends Logging {
if (!runInDebugMode) {
var returnCode = -1
var process: Process = null
val enableLogRedirection: Boolean = sys.props
.getOrElse(
ERROR_REDIRECITON_ENABLED.key,
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
case true => new CircularBuffer(
sys.props.getOrElse(
ERROR_BUFFER_SIZE.key,
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
}
try {
val builder = new ProcessBuilder(processParameters)
val env = builder.environment()
@ -136,9 +151,15 @@ object DotnetRunner extends Logging {
// Redirect stdin of JVM process to stdin of .NET process.
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
// Redirect stdout and stderr of .NET process.
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
// Redirect stdout and stderr of .NET process to System.out and to buffer
// if log direction is enabled. If not, redirect only to System.out.
new RedirectThread(
process.getInputStream,
stderrBuffer match {
case Some(buffer) => new TeeOutputStream(System.out, buffer)
case _ => System.out
},
"redirect .NET stdout and stderr").start()
process.waitFor()
} catch {
@ -148,9 +169,12 @@ object DotnetRunner extends Logging {
returnCode = closeDotnetProcess(process)
closeBackend(dotnetBackend)
}
if (returnCode != 0) {
throw new SparkUserAppException(returnCode)
if (stderrBuffer.isDefined) {
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
} else {
throw new SparkUserAppException(returnCode)
}
} else {
logInfo(s".NET application exited successfully")
}

Просмотреть файл

@ -15,4 +15,14 @@ private[spark] object Dotnet {
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
.createWithDefault(false)
val ERROR_REDIRECITON_ENABLED =
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
.createWithDefault(false)
val ERROR_BUFFER_SIZE =
ConfigBuilder("spark.nonjvm.error.buffer.size")
.intConf
.checkValue(_ >= 0, "The error buffer size must not be negative")
.createWithDefault(10240)
}

Просмотреть файл

@ -0,0 +1,22 @@
/*
* Licensed to the .NET Foundation under one or more agreements.
* The .NET Foundation licenses this file to you under the MIT license.
* See the LICENSE file in the project root for more information.
*/
package org.apache.spark.deploy.dotnet
import org.apache.spark.SparkException
/**
* This exception type describes an exception thrown by a .NET user application.
*
* @param exitCode Exit code returned by the .NET application.
* @param dotNetStackTrace Stacktrace extracted from .NET application logs.
*/
private[spark] class DotNetUserAppException(exitCode: Int, dotNetStackTrace: Option[String])
extends SparkException(
dotNetStackTrace match {
case None => s"User application exited with $exitCode"
case Some(e) => s"User application exited with $exitCode and .NET exception: $e"
})

Просмотреть файл

@ -14,14 +14,18 @@ import java.util.Locale
import java.util.concurrent.{Semaphore, TimeUnit}
import org.apache.commons.io.FilenameUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.hadoop.fs.Path
import org.apache.spark
import org.apache.spark.api.dotnet.DotnetBackend
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.dotnet.Dotnet.DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK
import org.apache.spark.internal.config.dotnet.Dotnet.{
DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK,
ERROR_BUFFER_SIZE, ERROR_REDIRECITON_ENABLED
}
import org.apache.spark.util.dotnet.{Utils => DotnetUtils}
import org.apache.spark.util.{RedirectThread, Utils}
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
import org.apache.spark.{SecurityManager, SparkConf, SparkUserAppException}
import scala.collection.JavaConverters._
@ -122,6 +126,17 @@ object DotnetRunner extends Logging {
if (!runInDebugMode) {
var returnCode = -1
var process: Process = null
val enableLogRedirection: Boolean = sys.props
.getOrElse(
ERROR_REDIRECITON_ENABLED.key,
ERROR_REDIRECITON_ENABLED.defaultValue.get.toString).toBoolean
val stderrBuffer: Option[CircularBuffer] = Option(enableLogRedirection).collect {
case true => new CircularBuffer(
sys.props.getOrElse(
ERROR_BUFFER_SIZE.key,
ERROR_BUFFER_SIZE.defaultValue.get.toString).toInt)
}
try {
val builder = new ProcessBuilder(processParameters)
val env = builder.environment()
@ -136,9 +151,15 @@ object DotnetRunner extends Logging {
// Redirect stdin of JVM process to stdin of .NET process.
new RedirectThread(System.in, process.getOutputStream, "redirect JVM input").start()
// Redirect stdout and stderr of .NET process.
new RedirectThread(process.getInputStream, System.out, "redirect .NET stdout").start()
new RedirectThread(process.getErrorStream, System.out, "redirect .NET stderr").start()
// Redirect stdout and stderr of .NET process to System.out and to buffer
// if log direction is enabled. If not, redirect only to System.out.
new RedirectThread(
process.getInputStream,
stderrBuffer match {
case Some(buffer) => new TeeOutputStream(System.out, buffer)
case _ => System.out
},
"redirect .NET stdout and stderr").start()
process.waitFor()
} catch {
@ -148,9 +169,12 @@ object DotnetRunner extends Logging {
returnCode = closeDotnetProcess(process)
closeBackend(dotnetBackend)
}
if (returnCode != 0) {
throw new SparkUserAppException(returnCode)
if (stderrBuffer.isDefined) {
throw new DotNetUserAppException(returnCode, Some(stderrBuffer.get.toString))
} else {
throw new SparkUserAppException(returnCode)
}
} else {
logInfo(s".NET application exited successfully")
}

Просмотреть файл

@ -15,4 +15,14 @@ private[spark] object Dotnet {
val DOTNET_IGNORE_SPARK_PATCH_VERSION_CHECK =
ConfigBuilder("spark.dotnet.ignoreSparkPatchVersionCheck").booleanConf
.createWithDefault(false)
val ERROR_REDIRECITON_ENABLED =
ConfigBuilder("spark.nonjvm.error.forwarding.enabled").booleanConf
.createWithDefault(false)
val ERROR_BUFFER_SIZE =
ConfigBuilder("spark.nonjvm.error.buffer.size")
.intConf
.checkValue(_ >= 0, "The error buffer size must not be negative")
.createWithDefault(10240)
}