Test changes to improve test coverage

This commit is contained in:
dwnichols 2016-12-14 14:04:56 -05:00
Родитель 8eb8cd0b54
Коммит ed73082895
2 изменённых файлов: 15 добавлений и 5 удалений

Просмотреть файл

@ -89,13 +89,23 @@ namespace AdapterTest.Mocks
public IDStreamProxy CreateCSharpReducedWindowedDStream(IDStreamProxy jdstream, byte[] func, byte[] invFunc, int windowSeconds, int slideSeconds, string serializationMode)
{
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> f = (Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>)formatter.Deserialize(new MemoryStream(func));
RDD<dynamic> rdd = f(DateTime.UtcNow.Ticks,
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> f = (Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>) formatter.Deserialize(new MemoryStream(func));
var ticks = DateTime.UtcNow.Ticks;
RDD<dynamic> rdd = f(ticks,
new RDD<dynamic>((jdstream as MockDStreamProxy).rddProxy ?? new MockRddProxy(null), new SparkContext("", "")),
new RDD<dynamic>((jdstream as MockDStreamProxy).rddProxy ?? new MockRddProxy(null), new SparkContext("", "")));
return new MockDStreamProxy(rdd.RddProxy);
}
if (invFunc == null) return new MockDStreamProxy(rdd.RddProxy);
Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>> invf = (Func<double, RDD<dynamic>, RDD<dynamic>, RDD<dynamic>>) formatter.Deserialize(new MemoryStream(invFunc));
RDD<dynamic> invRdd = invf(ticks,
new RDD<dynamic>((jdstream as MockDStreamProxy).rddProxy ?? new MockRddProxy(null), new SparkContext("", "")),
new RDD<dynamic>((jdstream as MockDStreamProxy).rddProxy ?? new MockRddProxy(null), new SparkContext("", "")));
var difference = rdd.Subtract(invRdd);
return new MockDStreamProxy(difference.RddProxy);
}
public IDStreamProxy CreateCSharpStateDStream(IDStreamProxy jdstream, byte[] func, string className, string serializationMode, string serializationMode2)
{

Просмотреть файл

@ -32,7 +32,7 @@ namespace AdapterTest
var socketStream = ssc.SocketTextStream(IPAddress.Loopback.ToString(), 12345);
Assert.IsNotNull(socketStream.DStreamProxy);
var kafkaStream = KafkaUtils.CreateStream(ssc, IPAddress.Loopback + ":2181", "testGroupId", new [] { Tuple.Create("testTopic1", 1) }, new List<Tuple<string, string>>());
var kafkaStream = KafkaUtils.CreateStream(ssc, IPAddress.Loopback + ":2181", "testGroupId", new [] { Tuple.Create("testTopic1", 1) }, null);
Assert.IsNotNull(kafkaStream.DStreamProxy);
var directKafkaStream = KafkaUtils.CreateDirectStream(ssc, new List<string> { "testTopic2" }, new List<Tuple<string, string>>(), new List<Tuple<string, long>>());