[System]: Use new `WebCompletionSource` instead of `TaskCompletionSource`. (#7293)

This new helper class uses `TaskCompletionSource<>` internally, but instead
of using TrySetException(), it sets a result containing an `ExceptionDispatchInfo`.

The problem with using TrySetException() is that it wraps the exception
object in an `AggregateException`, that would be unobserved if the caller
throws the original exception.
This commit is contained in:
Martin Baulig 2018-02-28 03:37:24 -05:00 коммит произвёл Marek Safar
Родитель dfe6e2d4a2
Коммит 8facc4afb2
5 изменённых файлов: 145 добавлений и 38 удалений

Просмотреть файл

@ -98,7 +98,7 @@ namespace System.Net
WebRequestStream writeStream;
HttpWebResponse webResponse;
TaskCompletionSource<HttpWebResponse> responseTask;
WebCompletionSource responseTask;
WebOperation currentOperation;
int aborted;
bool gotRequestStream;
@ -975,15 +975,16 @@ namespace System.Net
if (!sendChunked && transferEncoding != null && transferEncoding.Trim () != "")
throw new ProtocolViolationException ("SendChunked should be true.");
var myTcs = new TaskCompletionSource<HttpWebResponse> ();
var completion = new WebCompletionSource ();
WebOperation operation;
lock (locker) {
getResponseCalled = true;
var oldTcs = Interlocked.CompareExchange (ref responseTask, myTcs, null);
WebConnection.Debug ($"HWR GET RESPONSE: Req={ID} {oldTcs != null}");
if (oldTcs != null) {
if (haveResponse && oldTcs.Task.IsCompleted)
return oldTcs.Task.Result;
var oldCompletion = Interlocked.CompareExchange (ref responseTask, completion, null);
WebConnection.Debug ($"HWR GET RESPONSE: Req={ID} {oldCompletion != null}");
if (oldCompletion != null) {
oldCompletion.ThrowOnError ();
if (haveResponse && oldCompletion.IsCompleted)
return webResponse;
throw new InvalidOperationException ("Cannot re-call start of asynchronous " +
"method while a previous call is still in progress.");
}
@ -1030,14 +1031,14 @@ namespace System.Net
if (throwMe != null) {
WebConnection.Debug ($"HWR GET RESPONSE LOOP #1 EX: Req={ID} {throwMe.Status} {throwMe.InnerException?.GetType ()}");
haveResponse = true;
myTcs.TrySetException (throwMe);
completion.TrySetException (throwMe);
throw throwMe;
}
if (!redirect) {
haveResponse = true;
webResponse = response;
myTcs.TrySetResult (response);
completion.TrySetCompleted ();
return response;
}
@ -1063,7 +1064,7 @@ namespace System.Net
WebConnection.Debug ($"HWR GET RESPONSE LOOP #3 EX: Req={ID} {throwMe.Status} {throwMe.InnerException?.GetType ()}");
haveResponse = true;
stream?.Close ();
myTcs.TrySetException (throwMe);
completion.TrySetException (throwMe);
throw throwMe;
}

Просмотреть файл

@ -0,0 +1,103 @@
//
// WebCompletionSource.cs
//
// Author:
// Martin Baulig <mabaul@microsoft.com>
//
// Copyright (c) 2018 Xamarin Inc. (http://www.xamarin.com)
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.ExceptionServices;
namespace System.Net
{
class WebCompletionSource
{
TaskCompletionSource<Result> completion;
public WebCompletionSource ()
{
completion = new TaskCompletionSource<Result> ();
}
public bool TrySetCompleted ()
{
return completion.TrySetResult (new Result (State.Completed, null));
}
public bool TrySetCanceled ()
{
var error = new OperationCanceledException ();
var result = new Result (State.Canceled, ExceptionDispatchInfo.Capture (error));
return completion.TrySetResult (result);
}
public bool TrySetException (Exception error)
{
var result = new Result (State.Faulted, ExceptionDispatchInfo.Capture (error));
return completion.TrySetResult (result);
}
public bool IsCompleted => completion.Task.IsCompleted;
public void ThrowOnError ()
{
if (!completion.Task.IsCompleted)
return;
completion.Task.Result.Error?.Throw ();
}
public async Task<bool> WaitForCompletion (bool throwOnError)
{
var result = await completion.Task.ConfigureAwait (false);
if (result.State == State.Completed)
return true;
if (throwOnError)
result.Error.Throw ();
return false;
}
enum State : int {
Running,
Completed,
Canceled,
Faulted
}
class Result
{
public State State {
get;
}
public ExceptionDispatchInfo Error {
get;
}
public Result (State state, ExceptionDispatchInfo error)
{
State = state;
Error = error;
}
}
}
}

Просмотреть файл

@ -39,7 +39,7 @@ namespace System.Net
bool requestWritten;
bool allowBuffering;
bool sendChunked;
TaskCompletionSource<int> pendingWrite;
WebCompletionSource pendingWrite;
long totalWritten;
byte[] headers;
bool headersSent;
@ -152,8 +152,8 @@ namespace System.Net
if (Operation.WriteBuffer != null)
throw new InvalidOperationException ();
var myWriteTcs = new TaskCompletionSource<int> ();
if (Interlocked.CompareExchange (ref pendingWrite, myWriteTcs, null) != null)
var completion = new WebCompletionSource ();
if (Interlocked.CompareExchange (ref pendingWrite, completion, null) != null)
throw new InvalidOperationException (SR.GetString (SR.net_repcall));
return WriteAsyncInner (buffer, offset, count, myWriteTcs, cancellationToken);
@ -172,7 +172,7 @@ namespace System.Net
await FinishWriting (cancellationToken);
pendingWrite = null;
myWriteTcs.TrySetResult (0);
completion.TrySetCompleted ();
} catch (Exception ex) {
KillBuffer ();
closed = true;
@ -185,7 +185,7 @@ namespace System.Net
Operation.CompleteRequestWritten (this, ex);
pendingWrite = null;
myWriteTcs.TrySetException (ex);
completion.TrySetException (ex);
throw;
}
}
@ -374,11 +374,12 @@ namespace System.Net
cts.CancelAfter (WriteTimeout);
var timeoutTask = Task.Delay (WriteTimeout);
while (true) {
var myWriteTcs = new TaskCompletionSource<int> ();
var oldTcs = Interlocked.CompareExchange (ref pendingWrite, myWriteTcs, null);
if (oldTcs == null)
var completion = new WebCompletionSource ();
var oldCompletion = Interlocked.CompareExchange (ref pendingWrite, completion, null);
if (oldCompletion == null)
break;
var ret = await Task.WhenAny (timeoutTask, oldTcs.Task).ConfigureAwait (false);
var oldWriteTask = oldCompletion.WaitForCompletion (true);
var ret = await Task.WhenAny (timeoutTask, oldWriteTask).ConfigureAwait (false);
if (ret == timeoutTask)
throw new WebException ("The operation has timed out.", WebExceptionStatus.Timeout);
}

Просмотреть файл

@ -41,7 +41,7 @@ namespace System.Net
long totalRead;
bool nextReadCalled;
int stream_length; // -1 when CL not present
TaskCompletionSource<int> readTcs;
WebCompletionSource pendingRead;
object locker = new object ();
int nestedRead;
bool read_eof;
@ -125,16 +125,16 @@ namespace System.Net
if (Interlocked.CompareExchange (ref nestedRead, 1, 0) != 0)
throw new InvalidOperationException ("Invalid nested call.");
var myReadTcs = new TaskCompletionSource<int> ();
var completion = new WebCompletionSource ();
while (!cancellationToken.IsCancellationRequested) {
/*
* 'readTcs' is set by ReadAllAsync().
* 'currentRead' is set by ReadAllAsync().
*/
var oldReadTcs = Interlocked.CompareExchange (ref readTcs, myReadTcs, null);
WebConnection.Debug ($"{ME} READ ASYNC #1: {oldReadTcs != null}");
if (oldReadTcs == null)
var oldCompletion = Interlocked.CompareExchange (ref pendingRead, completion, null);
WebConnection.Debug ($"{ME} READ ASYNC #1: {oldCompletion != null}");
if (oldCompletion == null)
break;
await oldReadTcs.Task.ConfigureAwait (false);
await oldCompletion.WaitForCompletion (true).ConfigureAwait (false);
}
WebConnection.Debug ($"{ME} READ ASYNC #2: {totalRead} {contentLength}");
@ -158,8 +158,8 @@ namespace System.Net
if (throwMe != null) {
lock (locker) {
myReadTcs.TrySetException (throwMe);
readTcs = null;
completion.TrySetException (throwMe);
pendingRead = null;
nestedRead = 0;
}
@ -169,8 +169,8 @@ namespace System.Net
}
lock (locker) {
readTcs.TrySetResult (oldBytes + nbytes);
readTcs = null;
pendingRead.TrySetCompleted ();
pendingRead = null;
nestedRead = 0;
}
@ -405,18 +405,19 @@ namespace System.Net
}
var timeoutTask = Task.Delay (ReadTimeout);
var myReadTcs = new TaskCompletionSource<int> ();
var completion = new WebCompletionSource ();
while (true) {
/*
* 'readTcs' is set by ReadAsync().
* 'currentRead' is set by ReadAsync().
*/
cancellationToken.ThrowIfCancellationRequested ();
var oldReadTcs = Interlocked.CompareExchange (ref readTcs, myReadTcs, null);
if (oldReadTcs == null)
var oldCompletion = Interlocked.CompareExchange (ref pendingRead, completion, null);
if (oldCompletion == null)
break;
// ReadAsync() is in progress.
var anyTask = await Task.WhenAny (oldReadTcs.Task, timeoutTask).ConfigureAwait (false);
var oldReadTask = oldCompletion.WaitForCompletion (true);
var anyTask = await Task.WhenAny (oldReadTask, timeoutTask).ConfigureAwait (false);
if (anyTask == timeoutTask)
throw new WebException ("The operation has timed out.", WebExceptionStatus.Timeout);
}
@ -495,14 +496,14 @@ namespace System.Net
readBuffer = new BufferOffsetSize (b, 0, new_size, false);
totalRead = 0;
nextReadCalled = true;
myReadTcs.TrySetResult (new_size);
completion.TrySetCompleted ();
} catch (Exception ex) {
WebConnection.Debug ($"{ME} READ ALL ASYNC EX: {ex.Message}");
myReadTcs.TrySetException (ex);
completion.TrySetException (ex);
throw;
} finally {
WebConnection.Debug ($"{ME} READ ALL ASYNC #2");
readTcs = null;
pendingRead = null;
}
Operation.CompleteResponseRead (true);

Просмотреть файл

@ -43,6 +43,7 @@ System.Net/ServicePoint.cs
System.Net/ServicePointManager.cs
System.Net/ServicePointManager.extra.cs
System.Net/ServicePointScheduler.cs
System.Net/WebCompletionSource.cs
System.Net/WebConnection.cs
System.Net/WebConnectionStream.cs
System.Net/WebConnectionTunnel.cs