Merge pull request #359 from qintao1976/dev

In MultiThreadWorker mode, does not let the whole worker process exit when some TaskRunner fails
This commit is contained in:
Tao Qin 2016-03-25 09:43:36 +08:00
Родитель 2e3381be53 e4001e68ce
Коммит a4a3486f01
4 изменённых файлов: 9 добавлений и 8 удалений

Просмотреть файл

@ -57,7 +57,7 @@ namespace Microsoft.Spark.CSharp
// can not initialize logger earlier to avoid unwanted stdout ouput
InitializeLogger();
logger.LogInfo("RunDaemonWorker ...");
logger.LogInfo("Run MultiThreadWorker ...");
logger.LogDebug("Port number used to pipe in/out data between JVM and CLR {0}", localPort);
Worker.PrintFiles();

Просмотреть файл

@ -92,9 +92,9 @@ namespace Microsoft.Spark.CSharp
}
catch (Exception e)
{
logger.LogError(string.Format("TaskRunner [{0}] exeption, will exit", trId));
stop = true;
logger.LogError(string.Format("TaskRunner [{0}] exeption, will dispose this TaskRunner", trId));
logger.LogException(e);
Environment.Exit(-1);
}
finally
{

Просмотреть файл

@ -249,7 +249,7 @@ namespace Microsoft.Spark.CSharp
var funcProcessWatch = new Stopwatch();
commandProcessWatch.Start();
ReadDiagnosticsInfo(networkStream);
int stageId = ReadDiagnosticsInfo(networkStream);
string deserializerMode = SerDe.ReadString(networkStream);
logger.LogDebug("Deserializer mode: " + deserializerMode);
@ -322,7 +322,7 @@ namespace Microsoft.Spark.CSharp
// log statistics
inputEnumerator.LogStatistic();
logger.LogInfo(string.Format("func process time: {0}", funcProcessWatch.ElapsedMilliseconds));
logger.LogInfo(string.Format("command process time: {0}", commandProcessWatch.ElapsedMilliseconds));
logger.LogInfo(string.Format("stage {0}, command process time: {1}", stageId, commandProcessWatch.ElapsedMilliseconds));
}
else
{
@ -390,12 +390,13 @@ namespace Microsoft.Spark.CSharp
}
private static void ReadDiagnosticsInfo(NetworkStream networkStream)
private static int ReadDiagnosticsInfo(NetworkStream networkStream)
{
int rddId = SerDe.ReadInt(networkStream);
int stageId = SerDe.ReadInt(networkStream);
int partitionId = SerDe.ReadInt(networkStream);
logger.LogInfo(string.Format("rddInfo: rddId {0}, stageId {1}, partitionId {2}", rddId, stageId, partitionId));
return stageId;
}
private static void WriteDiagnosticsInfo(NetworkStream networkStream, DateTime bootTime, DateTime initTime)

Просмотреть файл

@ -333,7 +333,7 @@ namespace WorkerTest
s.Write(command, 0, command.Length / 2);
}
AssertWorker(worker, -1, "System.ArgumentException: Incomplete bytes read: ");
AssertWorker(worker, 0, "System.ArgumentException: Incomplete bytes read: ");
CSharpRDD_SocketServer.Stop();
}
@ -367,7 +367,7 @@ namespace WorkerTest
Assert.AreEqual(100, count);
}
AssertWorker(worker, -1, "System.NullReferenceException: Object reference not set to an instance of an object.");
AssertWorker(worker, 0, "System.NullReferenceException: Object reference not set to an instance of an object.");
CSharpRDD_SocketServer.Stop();
}