diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs index cce66d4d..539b41ec 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/DataStreamWriterTests.cs @@ -87,7 +87,9 @@ namespace Microsoft.Spark.E2ETest.IpcTests .WriteStream() .Format("parquet") .Option("checkpointLocation", tempDirectory.Path); - Assert.IsType(dsw.ToTable(tableName)); + + StreamingQuery sq = dsw.ToTable(tableName); + sq.Stop(); }); } diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryManagerTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryManagerTests.cs index a9badc5a..37e827c7 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryManagerTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryManagerTests.cs @@ -31,20 +31,18 @@ namespace Microsoft.Spark.E2ETest.IpcTests var intMemoryStream = new MemoryStream(_spark); StreamingQuery sq1 = intMemoryStream .ToDF().WriteStream().QueryName("intQuery").Format("console").Start(); - string id1 = sq1.Id; var stringMemoryStream = new MemoryStream(_spark); StreamingQuery sq2 = stringMemoryStream .ToDF().WriteStream().QueryName("stringQuery").Format("console").Start(); - string id2 = sq2.Id; StreamingQueryManager sqm = _spark.Streams(); StreamingQuery[] streamingQueries = sqm.Active().ToArray(); Assert.Equal(2, streamingQueries.Length); - Assert.IsType(sqm.Get(id1)); - Assert.IsType(sqm.Get(id2)); + Assert.IsType(sqm.Get(sq1.Id)); + Assert.IsType(sqm.Get(sq2.Id)); sqm.ResetTerminated(); diff --git a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryTests.cs b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryTests.cs index ed6fcc74..8715ee54 100644 --- a/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryTests.cs +++ b/src/csharp/Microsoft.Spark.E2ETest/IpcTests/Sql/Streaming/StreamingQueryTests.cs @@ -29,7 +29,15 @@ namespace Microsoft.Spark.E2ETest.IpcTests { var intMemoryStream = new MemoryStream(_spark); StreamingQuery sq = intMemoryStream - .ToDF().WriteStream().QueryName("testQuery").Format("console").Start(); + .ToDF() + .WriteStream() + .QueryName("testQuery") + .Format("console") + .Trigger(Trigger.Once()) + .Start(); + + sq.AwaitTermination(); + Assert.IsType(sq.AwaitTermination(10)); Assert.IsType(sq.Name); @@ -39,8 +47,6 @@ namespace Microsoft.Spark.E2ETest.IpcTests Assert.IsType(sq.IsActive()); - Assert.IsType(sq.AwaitTermination(10)); - sq.Explain(); Assert.Null(sq.Exception());