This commit is contained in:
James Terwilliger 2019-07-07 15:30:08 -07:00
Родитель 7d28392cfa 048a4d0718
Коммит d92209b687
4 изменённых файлов: 66 добавлений и 29 удалений

Просмотреть файл

@ -2,6 +2,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
@ -10,7 +11,7 @@ namespace Microsoft.StreamProcessing.Provider
/// The core interface of the streaming provider API, representing a temporal or time-series stream of data
/// </summary>
/// <typeparam name="TPayload">The type of the payload of the underlying stream query.</typeparam>
public interface IQStreamable<out TPayload>
public interface IQStreamable<TPayload>
{
/// <summary>
/// The expression representing the full query so far in the pipeline.
@ -21,5 +22,13 @@ namespace Microsoft.StreamProcessing.Provider
/// The assigned provider whose job it is to evaluate the query once it is constructed.
/// </summary>
IQStreamableProvider Provider { get; }
/// <summary>
///
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
IObservable<TResult> ToTemporalObservable<TResult>(Expression<Func<long, long, TPayload, TResult>> expression);
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
@ -27,5 +28,14 @@ namespace Microsoft.StreamProcessing.Provider
/// The assigned provider whose job it is to evaluate the query once it is constructed.
/// </summary>
public IQStreamableProvider Provider { get; }
/// <summary>
///
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="expression"></param>
/// <returns></returns>
public IObservable<TResult> ToTemporalObservable<TResult>(Expression<Func<long, long, TPayload, TResult>> expression)
=> throw new System.NotImplementedException();
}
}

Просмотреть файл

@ -3,7 +3,6 @@
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
namespace Microsoft.StreamProcessing.Provider
@ -45,33 +44,6 @@ namespace Microsoft.StreamProcessing.Provider
PartitionPolicy partitionPolicy)
=> throw new NotImplementedException();
private void TestMethod()
{
IQStreamable<Tuple<string, int>> test0 = null;
IQStreamable<Tuple<string, int>> test1 = null;
IQStreamable<Tuple<string, int>> test2 = null;
var test3 = from t in test1
join t1 in test0 on t.Item1 equals t1.Item1
join t2 in test2 on t.Item1 equals t2.Item1
// join t0 in test0 on t.Item1 equals t0.Item1
where t.Item2 < 10
group t by t.Item1 into g
select g.Window.Count();
}
private void TestMethod2()
{
IQStreamable<Tuple<string, int, IEnumerable<object>>> test1 = null;
var test3 = from t in test1
from o in t.Item3
// join t0 in test0 on t.Item1 equals t0.Item1
where t.Item2 < 10
group o by t.Item1 into g
select g.Window.Count();
}
/// <summary>
/// Stub
/// </summary>

Просмотреть файл

@ -0,0 +1,46 @@
// *********************************************************************
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License
// *********************************************************************
using System;
using System.Collections.Generic;
namespace Microsoft.StreamProcessing.Provider
{
internal class Tests
{
private void TestMethod()
{
var qc = new QueryContext();
IObservable<Tuple<string, int, long, long>> obs0 = null;
IObservable<Tuple<string, int>> obs1 = null;
IObservable<Tuple<string, int, long>> obs2 = null;
IQStreamable<Tuple<string, int, long, long>> test0 = qc.RegisterStream(obs0, o => o.Item3, o => o.Item4);
IQStreamable<Tuple<string, int>> test1 = qc.RegisterStream(obs1, o => 0, o => 0);
IQStreamable<Tuple<string, int, long>> test2 = qc.RegisterStream(obs2, o => o.Item3, o => o.Item3 + 1);
var test3 = from t in test1
join t1 in test0 on t.Item1 equals t1.Item1
join t2 in test2 on t.Item1 equals t2.Item1
// join t0 in test0 on t.Item1 equals t0.Item1
where t.Item2 < 10
let f = t2.Item1
group t by t.Item1 into g
select g.Window.Count();
}
private void TestMethod2()
{
IQStreamable<Tuple<string, int, IEnumerable<object>>> test1 = null;
var test3 = from t in test1
from o in t.Item3
// join t0 in test0 on t.Item1 equals t0.Item1
where t.Item2 < 10
group o by t.Item1 into g
select g.Window.Count();
}
}
}