diff --git a/csharp/Worker/Microsoft.Spark.CSharp/MultiThreadWorker.cs b/csharp/Worker/Microsoft.Spark.CSharp/MultiThreadWorker.cs index 85f8a84..6b47b37 100644 --- a/csharp/Worker/Microsoft.Spark.CSharp/MultiThreadWorker.cs +++ b/csharp/Worker/Microsoft.Spark.CSharp/MultiThreadWorker.cs @@ -57,7 +57,7 @@ namespace Microsoft.Spark.CSharp // can not initialize logger earlier to avoid unwanted stdout ouput InitializeLogger(); - logger.LogInfo("RunDaemonWorker ..."); + logger.LogInfo("Run MultiThreadWorker ..."); logger.LogDebug("Port number used to pipe in/out data between JVM and CLR {0}", localPort); Worker.PrintFiles(); diff --git a/csharp/Worker/Microsoft.Spark.CSharp/TaskRunner.cs b/csharp/Worker/Microsoft.Spark.CSharp/TaskRunner.cs index c521091..ec304ad 100644 --- a/csharp/Worker/Microsoft.Spark.CSharp/TaskRunner.cs +++ b/csharp/Worker/Microsoft.Spark.CSharp/TaskRunner.cs @@ -92,9 +92,9 @@ namespace Microsoft.Spark.CSharp } catch (Exception e) { - logger.LogError(string.Format("TaskRunner [{0}] exeption, will exit", trId)); + stop = true; + logger.LogError(string.Format("TaskRunner [{0}] exeption, will dispose this TaskRunner", trId)); logger.LogException(e); - Environment.Exit(-1); } finally { diff --git a/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs b/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs index f69df65..c3e0a49 100644 --- a/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs +++ b/csharp/Worker/Microsoft.Spark.CSharp/Worker.cs @@ -249,7 +249,7 @@ namespace Microsoft.Spark.CSharp var funcProcessWatch = new Stopwatch(); commandProcessWatch.Start(); - ReadDiagnosticsInfo(networkStream); + int stageId = ReadDiagnosticsInfo(networkStream); string deserializerMode = SerDe.ReadString(networkStream); logger.LogDebug("Deserializer mode: " + deserializerMode); @@ -322,7 +322,7 @@ namespace Microsoft.Spark.CSharp // log statistics inputEnumerator.LogStatistic(); logger.LogInfo(string.Format("func process time: {0}", funcProcessWatch.ElapsedMilliseconds)); - logger.LogInfo(string.Format("command process time: {0}", commandProcessWatch.ElapsedMilliseconds)); + logger.LogInfo(string.Format("stage {0}, command process time: {1}", stageId, commandProcessWatch.ElapsedMilliseconds)); } else { @@ -390,12 +390,13 @@ namespace Microsoft.Spark.CSharp } - private static void ReadDiagnosticsInfo(NetworkStream networkStream) + private static int ReadDiagnosticsInfo(NetworkStream networkStream) { int rddId = SerDe.ReadInt(networkStream); int stageId = SerDe.ReadInt(networkStream); int partitionId = SerDe.ReadInt(networkStream); logger.LogInfo(string.Format("rddInfo: rddId {0}, stageId {1}, partitionId {2}", rddId, stageId, partitionId)); + return stageId; } private static void WriteDiagnosticsInfo(NetworkStream networkStream, DateTime bootTime, DateTime initTime) diff --git a/csharp/WorkerTest/WorkerTest.cs b/csharp/WorkerTest/WorkerTest.cs index 21fddfb..80e6a5a 100644 --- a/csharp/WorkerTest/WorkerTest.cs +++ b/csharp/WorkerTest/WorkerTest.cs @@ -333,7 +333,7 @@ namespace WorkerTest s.Write(command, 0, command.Length / 2); } - AssertWorker(worker, -1, "System.ArgumentException: Incomplete bytes read: "); + AssertWorker(worker, 0, "System.ArgumentException: Incomplete bytes read: "); CSharpRDD_SocketServer.Stop(); } @@ -367,7 +367,7 @@ namespace WorkerTest Assert.AreEqual(100, count); } - AssertWorker(worker, -1, "System.NullReferenceException: Object reference not set to an instance of an object."); + AssertWorker(worker, 0, "System.NullReferenceException: Object reference not set to an instance of an object."); CSharpRDD_SocketServer.Stop(); }