Bug 1826537 - Part 1: Return unsettled promise from InputToReadableStreamAlgorithms::PullCallbackImpl r=smaug

Repeating D169484 before merging those classes.

(Skipping AsyncWait cancellation as per Nika streams should not reject resubscription. I'll fix it separately.)

Differential Revision: https://phabricator.services.mozilla.com/D174764
This commit is contained in:
Kagami Sascha Rosylight 2023-04-13 11:22:53 +00:00
Родитель 33700c125f
Коммит b6100b0007
4 изменённых файлов: 56 добавлений и 56 удалений

Просмотреть файл

@ -279,7 +279,9 @@ void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
return;
}
rv = mInputStream->AsyncWait(this, nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0,
// Subscribe WAIT_CLOSURE_ONLY so that OnInputStreamReady can be called when
// mInputStream is closed.
rv = mInputStream->AsyncWait(this, nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0,
mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, aStream, rv);
@ -453,9 +455,9 @@ BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
return NS_OK;
}
// Not having a promise means we are pinged by stream closure, but here we
// still have more data to read. Let's wait for the next read request in that
// case.
// Not having a promise means we are pinged by stream closure
// (WAIT_CLOSURE_ONLY below), but here we still have more data to read. Let's
// wait for the next read request in that case.
if (!mPullPromise) {
return NS_OK;
}

Просмотреть файл

@ -149,6 +149,9 @@ class BodyStream final : public nsIInputStreamCallback,
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
// Same as mGlobal but set to nullptr on OnInputStreamReady (on the owning
// thread).
// This promise is created by PullCallback and resolved when
// OnInputStreamReady succeeds. No need to try hard to settle it though, see
// also ReleaseObjects() for the reason.
RefPtr<Promise> mPullPromise;
// This is the original inputStream received during the CTOR. It will be

Просмотреть файл

@ -211,7 +211,7 @@ NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED_0(
InputToReadableStreamAlgorithms, UnderlyingSourceAlgorithmsWrapper)
NS_IMPL_CYCLE_COLLECTION_INHERITED(InputToReadableStreamAlgorithms,
UnderlyingSourceAlgorithmsWrapper, mStream,
mOwningEventTarget)
mOwningEventTarget, mPullPromise)
already_AddRefed<Promise> InputToReadableStreamAlgorithms::PullCallbackImpl(
JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
@ -221,30 +221,11 @@ already_AddRefed<Promise> InputToReadableStreamAlgorithms::PullCallbackImpl(
MOZ_DIAGNOSTIC_ASSERT(stream->Disturbed());
MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eWaiting ||
mState == eChecking || mState == eReading);
MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eInitialized);
MOZ_ASSERT(!mPullPromise);
mPullPromise = Promise::CreateInfallible(aController.GetParentObject());
RefPtr<Promise> resolvedWithUndefinedPromise =
Promise::CreateResolvedWithUndefined(aController.GetParentObject(), aRv);
if (aRv.Failed()) {
return nullptr;
}
if (mState == eReading) {
// We are already reading data.
return resolvedWithUndefinedPromise.forget();
}
if (mState == eChecking) {
// If we are looking for more data, there is nothing else we should do:
// let's move this checking operation in a reading.
MOZ_ASSERT(mInput);
mState = eReading;
return resolvedWithUndefinedPromise.forget();
}
mState = eReading;
mState = eInitialized;
MOZ_DIAGNOSTIC_ASSERT(mInput);
@ -255,7 +236,7 @@ already_AddRefed<Promise> InputToReadableStreamAlgorithms::PullCallbackImpl(
}
// All good.
return resolvedWithUndefinedPromise.forget();
return do_AddRef(mPullPromise);
}
NS_IMETHODIMP
@ -272,17 +253,13 @@ InputToReadableStreamAlgorithms::OnInputStreamReady(
"InputToReadableStream data available");
MOZ_DIAGNOSTIC_ASSERT(mInput);
MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking);
MOZ_DIAGNOSTIC_ASSERT(mState == eInitialized);
JSContext* cx = aes.cx();
uint64_t size = 0;
nsresult rv = mInput->Available(&size);
if (NS_SUCCEEDED(rv) && size == 0) {
// In theory this should not happen. If size is 0, the stream should be
// considered closed.
rv = NS_BASE_STREAM_CLOSED;
}
MOZ_ASSERT_IF(NS_SUCCEEDED(rv), size > 0);
// No warning for stream closed.
if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
@ -290,13 +267,15 @@ InputToReadableStreamAlgorithms::OnInputStreamReady(
return NS_OK;
}
// This extra checking is completed. Let's wait for the next read request.
if (mState == eChecking) {
mState = eWaiting;
// Not having a promise means we are pinged by stream closure
// (WAIT_CLOSURE_ONLY below), but here we still have more data to read. Let's
// wait for the next read request in that case.
if (!mPullPromise) {
return NS_OK;
}
mState = eWriting;
MOZ_DIAGNOSTIC_ASSERT(mPullPromise->State() ==
Promise::PromiseState::Pending);
ErrorResult errorResult;
EnqueueChunkWithSizeIntoStream(cx, mStream, size, errorResult);
@ -306,6 +285,20 @@ InputToReadableStreamAlgorithms::OnInputStreamReady(
return NS_OK;
}
// Enqueuing triggers read request chunk steps which may execute JS, trigger
// another OnInputStreamReady, and close the stream. Let's be careful here.
//
// XXX(krosylight): But it shouldn't happen, nsIAsyncInputStream
// implementations should prevent such synchronous reentrance by nullifying
// the callback member first before calling it. (Bug 1827418)
// That said, it's generally good to be cautious as there's no guarantee that
// the interface is implemented in a safest way.
MOZ_DIAGNOSTIC_ASSERT(mPullPromise);
if (mPullPromise) {
mPullPromise->MaybeResolveWithUndefined();
mPullPromise = nullptr;
}
// The previous call can execute JS (even up to running a nested event
// loop), so |mState| can't be asserted to have any particular value, even
// if the previous call succeeds.
@ -319,8 +312,9 @@ void InputToReadableStreamAlgorithms::WriteIntoReadRequestBuffer(
MOZ_DIAGNOSTIC_ASSERT(aBuffer);
MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
MOZ_DIAGNOSTIC_ASSERT(mInput);
MOZ_DIAGNOSTIC_ASSERT(mState == eWriting);
mState = eChecking;
MOZ_DIAGNOSTIC_ASSERT(mState == eInitialized);
MOZ_DIAGNOSTIC_ASSERT(mPullPromise->State() ==
Promise::PromiseState::Pending);
uint32_t written;
nsresult rv;
@ -353,7 +347,10 @@ void InputToReadableStreamAlgorithms::WriteIntoReadRequestBuffer(
return;
}
rv = mInput->AsyncWait(0, 0, mOwningEventTarget);
// Subscribe WAIT_CLOSURE_ONLY so that OnInputStreamReady can be called when
// mInput is closed.
rv = mInput->AsyncWait(nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0,
mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, aStream, rv);
return;
@ -425,6 +422,11 @@ void InputToReadableStreamAlgorithms::ReleaseObjects() {
mInput->Shutdown();
mInput = nullptr;
}
// It's okay to leave a potentially unsettled promise as-is as this is only
// used to prevent reentrant to PullCallback. CloseNative() or ErrorNative()
// will settle the read requests for us.
mPullPromise = nullptr;
}
void InputToReadableStreamAlgorithms::ErrorPropagation(JSContext* aCx,

Просмотреть файл

@ -251,20 +251,8 @@ class InputToReadableStreamAlgorithms final
// This is the beginning state before any reading operation.
eInitializing,
// RequestDataCallback has not been called yet. We haven't started to read
// data from the stream yet.
eWaiting,
// We are reading data in a separate I/O thread.
eReading,
// We are ready to write something in the JS Buffer.
eWriting,
// After a writing, we want to check if the stream is closed. After the
// check, we go back to eWaiting. If a reading request happens in the
// meantime, we move to eReading state.
eChecking,
// Stream is initialized and being consumed.
eInitialized,
// Operation completed.
eClosed,
@ -274,6 +262,11 @@ class InputToReadableStreamAlgorithms final
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
// This promise is created by PullCallback and resolved when
// OnInputStreamReady succeeds. No need to try hard to settle it though, see
// also ReleaseObjects() for the reason.
RefPtr<Promise> mPullPromise;
RefPtr<InputStreamHolder> mInput;
RefPtr<ReadableStream> mStream;
};