зеркало из https://github.com/microsoft/Tx.git
Adding StartTime/EndTime to Playback, and fix for task names
This commit is contained in:
Родитель
107d7f1fa2
Коммит
1c04b2b352
|
@ -10,11 +10,15 @@ namespace System.Reactive
|
|||
private readonly List<IDeserializer<TInput>> _deserializers;
|
||||
private IObserver<Timestamped<object>> _observer;
|
||||
|
||||
public DateTime StartTime { get; set; }
|
||||
public DateTime EndTime { get; set; }
|
||||
|
||||
public CompositeDeserializer(
|
||||
IObserver<Timestamped<object>> observer,
|
||||
params Type[] typeMaps)
|
||||
{
|
||||
_observer = observer;
|
||||
|
||||
_deserializers = new List<IDeserializer<TInput>>();
|
||||
foreach (Type mapType in typeMaps)
|
||||
{
|
||||
|
@ -88,6 +92,15 @@ namespace System.Reactive
|
|||
Timestamped<object> ts;
|
||||
if (d.TryDeserialize(value, out ts))
|
||||
{
|
||||
// TODO: this achieves the right semantics,
|
||||
// but the performance will be sub optimal
|
||||
|
||||
if (ts.Timestamp.DateTime < StartTime)
|
||||
return;
|
||||
|
||||
if (ts.Timestamp.DateTime > EndTime)
|
||||
return;
|
||||
|
||||
_observer.OnNext(ts);
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -31,12 +31,14 @@ namespace System.Reactive
|
|||
private PullMergeSort<Timestamped<object>> _mergesort;
|
||||
private OutputPump<Timestamped<object>> _pump;
|
||||
private ManualResetEvent _pumpStart;
|
||||
private DateTime _startTime = DateTime.MinValue;
|
||||
|
||||
/// <summary>
|
||||
/// Constructor
|
||||
/// </summary>
|
||||
protected PlaybackBase()
|
||||
{
|
||||
EndTime = DateTime.MaxValue;
|
||||
_inputs = new List<IInputStream>();
|
||||
_demux = new Demultiplexor();
|
||||
_timeSource = new TimeSource<Timestamped<object>>(_subject, e => e.Timestamp);
|
||||
|
@ -51,12 +53,21 @@ namespace System.Reactive
|
|||
/// <summary>
|
||||
/// Gets or sets the start time for the playback
|
||||
/// The setter must be called before any operators that take Scheduler are used
|
||||
///
|
||||
/// Playback will only deliver event after the given start time
|
||||
/// </summary>
|
||||
public DateTimeOffset StartTime
|
||||
public DateTime StartTime
|
||||
{
|
||||
get { return _timeSource.StartTime; }
|
||||
set { _timeSource.StartTime = value; }
|
||||
get { return _startTime; }
|
||||
set
|
||||
{
|
||||
_startTime = value;
|
||||
_timeSource.StartTime = new DateTimeOffset(value);
|
||||
}
|
||||
}
|
||||
|
||||
// set this to ignore events past given time
|
||||
public DateTime EndTime { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// The event types that are known
|
||||
|
@ -108,7 +119,7 @@ namespace System.Reactive
|
|||
Expression<Func<IObservable<TInput>>> createInput,
|
||||
params Type[] typeMaps)
|
||||
{
|
||||
var input = new InputStream<TInput>(createInput, typeMaps);
|
||||
var input = new InputStream<TInput>(createInput, StartTime, EndTime, typeMaps);
|
||||
_inputs.Add(input);
|
||||
}
|
||||
|
||||
|
@ -138,17 +149,20 @@ namespace System.Reactive
|
|||
/// The main use case is real-time feeds.
|
||||
/// </summary>
|
||||
public void Start()
|
||||
{
|
||||
if (KnownTypes != null)
|
||||
{
|
||||
foreach (Type t in KnownTypes)
|
||||
{
|
||||
foreach (IInputStream i in _inputs)
|
||||
{
|
||||
i.StartTime = StartTime;
|
||||
i.EndTime = EndTime;
|
||||
|
||||
if (KnownTypes == null)
|
||||
continue;
|
||||
|
||||
foreach (Type t in KnownTypes)
|
||||
{
|
||||
i.AddKnownType(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (_inputs.Count == 0)
|
||||
throw new Exception("No input sequences were added to the Playback");
|
||||
|
@ -214,6 +228,9 @@ namespace System.Reactive
|
|||
void AddKnownType(Type t);
|
||||
void Start();
|
||||
void Start(IObserver<Timestamped<object>> observer);
|
||||
|
||||
DateTime StartTime { get; set; }
|
||||
DateTime EndTime { get; set; }
|
||||
}
|
||||
|
||||
protected class InputStream<TInput> : IInputStream, IDisposable
|
||||
|
@ -225,6 +242,8 @@ namespace System.Reactive
|
|||
|
||||
public InputStream(
|
||||
Expression<Func<IObservable<TInput>>> createInput,
|
||||
DateTime startTime,
|
||||
DateTime endTime,
|
||||
params Type[] typeMaps)
|
||||
{
|
||||
_source = createInput.Compile()();
|
||||
|
@ -232,6 +251,19 @@ namespace System.Reactive
|
|||
_deserializer = new CompositeDeserializer<TInput>(_output, typeMaps);
|
||||
}
|
||||
|
||||
public DateTime StartTime
|
||||
{
|
||||
get { return _deserializer.StartTime; }
|
||||
set { _deserializer.StartTime = value; }
|
||||
}
|
||||
|
||||
public DateTime EndTime
|
||||
{
|
||||
get { return _deserializer.EndTime; }
|
||||
set { _deserializer.EndTime = value; }
|
||||
}
|
||||
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_subscription != null)
|
||||
|
|
|
@ -689,7 +689,7 @@ using System;");
|
|||
var mapCollection = new Dictionary<string, string>();
|
||||
foreach (var taskValue in tasks.Elements())
|
||||
{
|
||||
var taskEnumIdentifier = taskValue.Attribute(AttributeNames.Name).Value;
|
||||
var taskEnumIdentifier = NameUtils.CreateIdentifier(taskValue.Attribute(AttributeNames.Name).Value);
|
||||
var taskEnumValue = taskValue.Attribute(AttributeNames.Value).Value;
|
||||
|
||||
sb.AppendFormat(" {0} = {1},", taskEnumIdentifier, taskEnumValue);
|
||||
|
|
Загрузка…
Ссылка в новой задаче