Use ValueTask on schedulers.
This commit is contained in:
Родитель
3bb62ff3c1
Коммит
2c82a0f86f
|
@ -90,7 +90,7 @@ namespace System.Reactive.Concurrency
|
|||
tcs.SetResult(true);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}, dueTime);
|
||||
|
||||
using (token.Register(() => task.DisposeAsync()))
|
||||
|
@ -117,7 +117,7 @@ namespace System.Reactive.Concurrency
|
|||
tcs.SetResult(true);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}, dueTime);
|
||||
|
||||
using (token.Register(() => task.DisposeAsync()))
|
||||
|
|
|
@ -11,7 +11,7 @@ namespace System.Reactive.Concurrency
|
|||
{
|
||||
public virtual DateTimeOffset Now => DateTimeOffset.Now;
|
||||
|
||||
public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action)
|
||||
public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action)
|
||||
{
|
||||
if (action == null)
|
||||
throw new ArgumentNullException(nameof(action));
|
||||
|
@ -19,7 +19,7 @@ namespace System.Reactive.Concurrency
|
|||
return ScheduleAsyncCore(action);
|
||||
}
|
||||
|
||||
public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, TimeSpan dueTime)
|
||||
public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, TimeSpan dueTime)
|
||||
{
|
||||
if (action == null)
|
||||
throw new ArgumentNullException(nameof(action));
|
||||
|
@ -34,7 +34,7 @@ namespace System.Reactive.Concurrency
|
|||
});
|
||||
}
|
||||
|
||||
public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, DateTimeOffset dueTime)
|
||||
public virtual ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, DateTimeOffset dueTime)
|
||||
{
|
||||
if (action == null)
|
||||
throw new ArgumentNullException(nameof(action));
|
||||
|
@ -49,7 +49,7 @@ namespace System.Reactive.Concurrency
|
|||
});
|
||||
}
|
||||
|
||||
protected virtual async ValueTask<IAsyncDisposable> ScheduleAsyncCore(Func<CancellationToken, Task> action)
|
||||
protected virtual async ValueTask<IAsyncDisposable> ScheduleAsyncCore(Func<CancellationToken, ValueTask> action)
|
||||
{
|
||||
var cad = new CancellationAsyncDisposable();
|
||||
|
||||
|
@ -58,9 +58,9 @@ namespace System.Reactive.Concurrency
|
|||
return cad;
|
||||
}
|
||||
|
||||
protected abstract Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token);
|
||||
protected abstract ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token);
|
||||
|
||||
protected abstract Task Delay(TimeSpan dueTime, CancellationToken token);
|
||||
protected abstract ValueTask Delay(TimeSpan dueTime, CancellationToken token);
|
||||
|
||||
protected static TimeSpan Normalize(TimeSpan timeSpan) => timeSpan < TimeSpan.Zero ? TimeSpan.Zero : timeSpan;
|
||||
|
||||
|
|
|
@ -13,8 +13,8 @@ namespace System.Reactive.Concurrency
|
|||
|
||||
private ImmediateAsyncScheduler() { }
|
||||
|
||||
protected override Task Delay(TimeSpan dueTime, CancellationToken token) => Task.Delay(dueTime);
|
||||
protected override ValueTask Delay(TimeSpan dueTime, CancellationToken token) => new ValueTask(Task.Delay(dueTime));
|
||||
|
||||
protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token) => action(token);
|
||||
protected override ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token) => action(token);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,9 +16,9 @@ namespace System.Reactive.Concurrency
|
|||
_context = context ?? throw new ArgumentNullException(nameof(context));
|
||||
}
|
||||
|
||||
protected override Task Delay(TimeSpan dueTime, CancellationToken token) => Task.Delay(dueTime, token);
|
||||
protected override ValueTask Delay(TimeSpan dueTime, CancellationToken token) => new ValueTask(Task.Delay(dueTime, token));
|
||||
|
||||
protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token)
|
||||
protected override ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token)
|
||||
{
|
||||
_context.Post(_ =>
|
||||
{
|
||||
|
@ -28,7 +28,7 @@ namespace System.Reactive.Concurrency
|
|||
}
|
||||
}, null);
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,11 +27,11 @@ namespace System.Reactive.Concurrency
|
|||
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
|
||||
}
|
||||
|
||||
protected override Task Delay(TimeSpan dueTime, CancellationToken token) => Task.Delay(dueTime, token);
|
||||
protected override ValueTask Delay(TimeSpan dueTime, CancellationToken token) => new ValueTask(Task.Delay(dueTime, token));
|
||||
|
||||
protected override Task ScheduleAsyncCore(Func<CancellationToken, Task> action, CancellationToken token)
|
||||
protected override ValueTask ScheduleAsyncCore(Func<CancellationToken, ValueTask> action, CancellationToken token)
|
||||
{
|
||||
var task = _factory.StartNew(() => action(token), token);
|
||||
var task = _factory.StartNew(() => action(token).AsTask(), token);
|
||||
|
||||
task.Unwrap().ContinueWith(t =>
|
||||
{
|
||||
|
@ -41,7 +41,7 @@ namespace System.Reactive.Concurrency
|
|||
}
|
||||
});
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,8 @@ namespace System.Reactive.Concurrency
|
|||
{
|
||||
public interface IAsyncScheduler : IClock
|
||||
{
|
||||
ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action);
|
||||
ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, TimeSpan dueTime);
|
||||
ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, Task> action, DateTimeOffset dueTime);
|
||||
ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action);
|
||||
ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, TimeSpan dueTime);
|
||||
ValueTask<IAsyncDisposable> ScheduleAsync(Func<CancellationToken, ValueTask> action, DateTimeOffset dueTime);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -187,7 +187,7 @@ namespace System.Runtime.CompilerServices
|
|||
{
|
||||
ExceptionDispatchInfo.Capture(exception).Throw();
|
||||
|
||||
return System.Threading.Tasks.Task.CompletedTask;
|
||||
return default;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,6 @@ namespace System.Reactive
|
|||
|
||||
protected override IAwaitable<R> RendezVous<R>(ValueTask<R> task) => new ValueTaskAwaitable<R>(task, false, null, CancellationToken.None);
|
||||
|
||||
protected override Task ScheduleAsync() => RunAsync(_disposable.Token);
|
||||
protected override ValueTask ScheduleAsync() => RunAsync(_disposable.Token);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,8 +8,8 @@ namespace System.Reactive
|
|||
{
|
||||
internal interface IScheduledAsyncObserver<T> : IAsyncObserver<T>, IAsyncDisposable
|
||||
{
|
||||
Task EnsureActive();
|
||||
ValueTask EnsureActive();
|
||||
|
||||
Task EnsureActive(int count);
|
||||
ValueTask EnsureActive(int count);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ namespace System.Reactive
|
|||
|
||||
protected override IAwaitable<R> RendezVous<R>(ValueTask<R> task) => new ValueTaskAwaitable<R>(task, false, _scheduler, CancellationToken.None);
|
||||
|
||||
protected override async Task ScheduleAsync()
|
||||
protected override async ValueTask ScheduleAsync()
|
||||
{
|
||||
var d = await _scheduler.ScheduleAsync(RunAsync).ConfigureAwait(false);
|
||||
await _disposable.AssignAsync(d).ConfigureAwait(false);
|
||||
|
|
|
@ -25,9 +25,9 @@ namespace System.Reactive
|
|||
_observer = observer;
|
||||
}
|
||||
|
||||
public Task EnsureActive() => EnsureActive(1);
|
||||
public ValueTask EnsureActive() => EnsureActive(1);
|
||||
|
||||
public async Task EnsureActive(int count)
|
||||
public async ValueTask EnsureActive(int count)
|
||||
{
|
||||
var shouldRun = false;
|
||||
|
||||
|
@ -46,9 +46,9 @@ namespace System.Reactive
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract Task ScheduleAsync();
|
||||
protected abstract ValueTask ScheduleAsync();
|
||||
|
||||
protected async Task RunAsync(CancellationToken token)
|
||||
protected async ValueTask RunAsync(CancellationToken token)
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
|
|
|
@ -230,11 +230,11 @@ namespace System.Reactive.Subjects
|
|||
}
|
||||
}
|
||||
|
||||
private async Task EnsureActive(IScheduledAsyncObserver<T>[] observers)
|
||||
private async ValueTask EnsureActive(IScheduledAsyncObserver<T>[] observers)
|
||||
{
|
||||
if (_concurrent)
|
||||
{
|
||||
await Task.WhenAll(observers.Select(o => o.EnsureActive())).ConfigureAwait(false);
|
||||
await Task.WhenAll(observers.Select(o => o.EnsureActive().AsTask())).ConfigureAwait(false);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -285,9 +285,9 @@ namespace System.Reactive.Subjects
|
|||
|
||||
protected abstract IScheduledAsyncObserver<T> CreateScheduledObserver(IAsyncObserver<T> observer);
|
||||
|
||||
protected abstract Task NextAsync(T value);
|
||||
protected abstract ValueTask NextAsync(T value);
|
||||
|
||||
protected abstract Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer);
|
||||
protected abstract ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer);
|
||||
|
||||
protected abstract void Trim();
|
||||
|
||||
|
@ -337,15 +337,15 @@ namespace System.Reactive.Subjects
|
|||
{
|
||||
}
|
||||
|
||||
protected override Task NextAsync(T value)
|
||||
protected override ValueTask NextAsync(T value)
|
||||
{
|
||||
_hasValue = true;
|
||||
_value = value;
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}
|
||||
|
||||
protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
|
||||
protected override async ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
|
||||
{
|
||||
if (_hasValue)
|
||||
{
|
||||
|
@ -368,14 +368,14 @@ namespace System.Reactive.Subjects
|
|||
{
|
||||
}
|
||||
|
||||
protected override Task NextAsync(T value)
|
||||
protected override ValueTask NextAsync(T value)
|
||||
{
|
||||
Values.Enqueue(value);
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}
|
||||
|
||||
protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
|
||||
protected override async ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
|
||||
{
|
||||
var count = Values.Count;
|
||||
|
||||
|
@ -432,14 +432,14 @@ namespace System.Reactive.Subjects
|
|||
_window = window;
|
||||
}
|
||||
|
||||
protected override Task NextAsync(T value)
|
||||
protected override ValueTask NextAsync(T value)
|
||||
{
|
||||
_values.Enqueue(new Timestamped<T>(value, _scheduler.Now));
|
||||
|
||||
return Task.CompletedTask;
|
||||
return default;
|
||||
}
|
||||
|
||||
protected override async Task<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
|
||||
protected override async ValueTask<int> ReplayAsync(IScheduledAsyncObserver<T> observer)
|
||||
{
|
||||
var count = _values.Count;
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ namespace System.Threading
|
|||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
|
||||
private readonly AsyncLocal<int> _recursionCount = new AsyncLocal<int>();
|
||||
|
||||
public Task<Releaser> LockAsync()
|
||||
public ValueTask<Releaser> LockAsync()
|
||||
{
|
||||
var shouldAcquire = false;
|
||||
|
||||
|
@ -32,10 +32,10 @@ namespace System.Threading
|
|||
|
||||
if (shouldAcquire)
|
||||
{
|
||||
return _semaphore.WaitAsync().ContinueWith(_ => new Releaser(this));
|
||||
return new ValueTask<Releaser>(_semaphore.WaitAsync().ContinueWith(_ => new Releaser(this)));
|
||||
}
|
||||
|
||||
return Task.FromResult(new Releaser(this));
|
||||
return new ValueTask<Releaser>(new Releaser(this));
|
||||
}
|
||||
|
||||
private void Release()
|
||||
|
|
Загрузка…
Ссылка в новой задаче