StreamingQueryManagerTests test fix (#907)

This commit is contained in:
Steve Suh 2021-04-19 18:44:07 -07:00 коммит произвёл GitHub
Родитель 5e6a142879
Коммит 1f1f736894
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 14 добавлений и 8 удалений

Просмотреть файл

@ -87,7 +87,9 @@ namespace Microsoft.Spark.E2ETest.IpcTests
.WriteStream()
.Format("parquet")
.Option("checkpointLocation", tempDirectory.Path);
Assert.IsType<StreamingQuery>(dsw.ToTable(tableName));
StreamingQuery sq = dsw.ToTable(tableName);
sq.Stop();
});
}

Просмотреть файл

@ -31,20 +31,18 @@ namespace Microsoft.Spark.E2ETest.IpcTests
var intMemoryStream = new MemoryStream<int>(_spark);
StreamingQuery sq1 = intMemoryStream
.ToDF().WriteStream().QueryName("intQuery").Format("console").Start();
string id1 = sq1.Id;
var stringMemoryStream = new MemoryStream<string>(_spark);
StreamingQuery sq2 = stringMemoryStream
.ToDF().WriteStream().QueryName("stringQuery").Format("console").Start();
string id2 = sq2.Id;
StreamingQueryManager sqm = _spark.Streams();
StreamingQuery[] streamingQueries = sqm.Active().ToArray();
Assert.Equal(2, streamingQueries.Length);
Assert.IsType<StreamingQuery>(sqm.Get(id1));
Assert.IsType<StreamingQuery>(sqm.Get(id2));
Assert.IsType<StreamingQuery>(sqm.Get(sq1.Id));
Assert.IsType<StreamingQuery>(sqm.Get(sq2.Id));
sqm.ResetTerminated();

Просмотреть файл

@ -29,7 +29,15 @@ namespace Microsoft.Spark.E2ETest.IpcTests
{
var intMemoryStream = new MemoryStream<int>(_spark);
StreamingQuery sq = intMemoryStream
.ToDF().WriteStream().QueryName("testQuery").Format("console").Start();
.ToDF()
.WriteStream()
.QueryName("testQuery")
.Format("console")
.Trigger(Trigger.Once())
.Start();
sq.AwaitTermination();
Assert.IsType<bool>(sq.AwaitTermination(10));
Assert.IsType<string>(sq.Name);
@ -39,8 +47,6 @@ namespace Microsoft.Spark.E2ETest.IpcTests
Assert.IsType<bool>(sq.IsActive());
Assert.IsType<bool>(sq.AwaitTermination(10));
sq.Explain();
Assert.Null(sq.Exception());