Bug 1741941 - Implement ReadableByteStreamTee r=smaug

Differential Revision: https://phabricator.services.mozilla.com/D131549
This commit is contained in:
Matthew Gaudet 2022-01-04 22:33:24 +00:00
Родитель 363163fe4f
Коммит 8425a02f89
13 изменённых файлов: 1105 добавлений и 12 удалений

Просмотреть файл

@ -65,8 +65,7 @@ bool CanTransferArrayBuffer(JSContext* aCx, JS::Handle<JSObject*> aObject,
}
// https://streams.spec.whatwg.org/#abstract-opdef-cloneasuint8array
JSObject* CloneAsUint8Array(JSContext* aCx, JS::HandleObject aObject,
ErrorResult& aRv) {
JSObject* CloneAsUint8Array(JSContext* aCx, JS::HandleObject aObject) {
// Step 1. Assert: Type(O) is Object. Implicit.
// Step 2. Assert: O has an [[ViewedArrayBuffer]] internal slot.
MOZ_ASSERT(JS_IsArrayBufferViewObject(aObject));
@ -76,7 +75,6 @@ JSObject* CloneAsUint8Array(JSContext* aCx, JS::HandleObject aObject,
JS::RootedObject viewedArrayBuffer(
aCx, JS_GetArrayBufferViewBuffer(aCx, aObject, &isShared));
if (!viewedArrayBuffer) {
aRv.StealExceptionFromJSContext(aCx);
return nullptr;
}
MOZ_ASSERT(!JS::IsDetachedArrayBufferObject(viewedArrayBuffer));
@ -85,7 +83,6 @@ JSObject* CloneAsUint8Array(JSContext* aCx, JS::HandleObject aObject,
// O.[[ByteOffset]], O.[[ByteLength]], %ArrayBuffer%).
JS::RootedObject buffer(aCx, JS::CopyArrayBuffer(aCx, aObject));
if (!buffer) {
aRv.StealExceptionFromJSContext(aCx);
return nullptr;
}
@ -95,7 +92,6 @@ JSObject* CloneAsUint8Array(JSContext* aCx, JS::HandleObject aObject,
JS::RootedObject array(aCx, JS_NewUint8ArrayWithBuffer(
aCx, buffer, byteOffset, (int64_t)length));
if (!array) {
aRv.StealExceptionFromJSContext(aCx);
return nullptr;
}

Просмотреть файл

@ -22,7 +22,10 @@ JSObject* TransferArrayBuffer(JSContext* aCx, JS::Handle<JSObject*> aObject);
bool CanTransferArrayBuffer(JSContext* aCx, JS::Handle<JSObject*> aObject,
ErrorResult& aRv);
JSObject* CloneAsUint8Array(JS::HandleObject O, ErrorResult& aRv);
// If this returns null, it will leave a pending exception on aCx which
// must be handled by the caller (in the spec this is always the case
// currently).
JSObject* CloneAsUint8Array(JSContext* aCx, JS::HandleObject O);
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -30,6 +30,7 @@ struct ReadIntoRequest : public nsISupports,
// An algorithm taking a chunk or undefined, called when no chunks are
// available because the stream is closed
MOZ_CAN_RUN_SCRIPT
virtual void CloseSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
ErrorResult& errorResult) = 0;

Просмотреть файл

@ -14,6 +14,7 @@
#include "js/friend/ErrorMessages.h"
#include "mozilla/AlreadyAddRefed.h"
#include "mozilla/Attributes.h"
#include "mozilla/ErrorResult.h"
#include "mozilla/dom/ByteStreamHelpers.h"
#include "mozilla/dom/Promise.h"
#include "mozilla/dom/PromiseNativeHandler.h"
@ -27,6 +28,7 @@
#include "mozilla/dom/ReadableStreamDefaultController.h"
#include "mozilla/dom/ReadableStreamGenericReader.h"
#include "mozilla/dom/ToJSValue.h"
#include "mozilla/dom/ScriptSettings.h"
#include "nsCycleCollectionParticipant.h"
#include "nsIGlobalObject.h"
#include "nsISupports.h"
@ -489,6 +491,7 @@ JSObject* ReadableByteStreamControllerConvertPullIntoDescriptor(
JSContext* aCx, PullIntoDescriptor* pullIntoDescriptor, ErrorResult& aRv);
// https://streams.spec.whatwg.org/#readable-stream-fulfill-read-into-request
MOZ_CAN_RUN_SCRIPT
void ReadableStreamFulfillReadIntoRequest(JSContext* aCx,
ReadableStream* aStream,
JS::HandleValue aChunk, bool done,
@ -519,6 +522,7 @@ void ReadableStreamFulfillReadIntoRequest(JSContext* aCx,
}
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor
MOZ_CAN_RUN_SCRIPT
void ReadableByteStreamControllerCommitPullIntoDescriptor(
JSContext* aCx, ReadableStream* aStream,
PullIntoDescriptor* pullIntoDescriptor, ErrorResult& aRv) {
@ -565,6 +569,7 @@ void ReadableByteStreamControllerCommitPullIntoDescriptor(
}
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue
MOZ_CAN_RUN_SCRIPT
void ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
JSContext* aCx, ReadableByteStreamController* aController,
ErrorResult& aRv) {
@ -579,7 +584,7 @@ void ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
}
// Step 2.2. Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
PullIntoDescriptor* pullIntoDescriptor =
RefPtr<PullIntoDescriptor> pullIntoDescriptor =
aController->PendingPullIntos().getFirst();
// Step 2.3. If
@ -600,8 +605,9 @@ void ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
// Step 2.3.2. Perform
// !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]],
// pullIntoDescriptor).
RefPtr<ReadableStream> stream(aController->Stream());
ReadableByteStreamControllerCommitPullIntoDescriptor(
aCx, aController->Stream(), pullIntoDescriptor, aRv);
aCx, stream, pullIntoDescriptor, aRv);
if (aRv.Failed()) {
return;
}
@ -1012,6 +1018,7 @@ JSObject* ReadableByteStreamControllerConvertPullIntoDescriptor(
}
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state
MOZ_CAN_RUN_SCRIPT
static void ReadableByteStreamControllerRespondInClosedState(
JSContext* aCx, ReadableByteStreamController* aController,
RefPtr<PullIntoDescriptor>& aFirstDescriptor, ErrorResult& aRv) {
@ -1019,7 +1026,7 @@ static void ReadableByteStreamControllerRespondInClosedState(
MOZ_ASSERT(aFirstDescriptor->BytesFilled() == 0);
// Step 2.
ReadableStream* stream = aController->Stream();
RefPtr<ReadableStream> stream = aController->Stream();
// Step 3.
if (ReadableStreamHasBYOBReader(stream)) {
@ -1621,5 +1628,139 @@ void ReadableByteStreamControllerPullInto(
ReadableByteStreamControllerCallPullIfNeeded(aCx, aController, aRv);
}
class ByteStreamStartPromiseNativeHandler final : public PromiseNativeHandler {
~ByteStreamStartPromiseNativeHandler() = default;
RefPtr<ReadableByteStreamController> mController;
public:
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
NS_DECL_CYCLE_COLLECTION_CLASS(ByteStreamStartPromiseNativeHandler)
explicit ByteStreamStartPromiseNativeHandler(
ReadableByteStreamController* aController)
: PromiseNativeHandler(), mController(aController) {}
MOZ_CAN_RUN_SCRIPT
void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
MOZ_ASSERT(mController);
// https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller
//
// Step 16.1
mController->SetStarted(true);
// Step 16.2
mController->SetPulling(false);
// Step 16.3
mController->SetPullAgain(false);
// Step 16.4:
ErrorResult rv;
RefPtr<ReadableByteStreamController> stackController = mController;
ReadableByteStreamControllerCallPullIfNeeded(aCx, stackController, rv);
(void)rv.MaybeSetPendingException(aCx, "StartPromise Resolve Error");
}
void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
// https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller
// Step 17.1
ErrorResult rv;
ReadableByteStreamControllerError(mController, aValue, rv);
(void)rv.MaybeSetPendingException(aCx, "StartPromise Rejected Error");
}
};
// Cycle collection methods for promise handler
NS_IMPL_CYCLE_COLLECTION(ByteStreamStartPromiseNativeHandler, mController)
NS_IMPL_CYCLE_COLLECTING_ADDREF(ByteStreamStartPromiseNativeHandler)
NS_IMPL_CYCLE_COLLECTING_RELEASE(ByteStreamStartPromiseNativeHandler)
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ByteStreamStartPromiseNativeHandler)
NS_INTERFACE_MAP_ENTRY(nsISupports)
NS_INTERFACE_MAP_END
// https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller
MOZ_CAN_RUN_SCRIPT
void SetUpReadableByteStreamController(
JSContext* aCx, ReadableStream* aStream,
ReadableByteStreamController* aController,
UnderlyingSourceStartCallbackHelper* aStartAlgorithm,
UnderlyingSourcePullCallbackHelper* aPullAlgorithm,
UnderlyingSourceCancelCallbackHelper* aCancelAlgorithm,
double aHighWaterMark, Maybe<uint64_t> aAutoAllocateChunkSize,
ErrorResult& aRv) {
// Step 1. Assert: stream.[[controller]] is undefined.
MOZ_ASSERT(!aStream->Controller());
// Step 2. If autoAllocateChunkSize is not undefined,
if (aAutoAllocateChunkSize) {
// Step 2.1. Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
// Step 2.2. Assert: autoAllocateChunkSize is positive.
MOZ_ASSERT(*aAutoAllocateChunkSize >= 0);
}
// Step 3. Set controller.[[stream]] to stream.
aController->SetStream(aStream);
// Step 4. Set controller.[[pullAgain]] and controller.[[pulling]] to false.
aController->SetPullAgain(false);
aController->SetPulling(false);
// Step 5. Set controller.[[byobRequest]] to null.
aController->SetByobRequest(nullptr);
// Step 6. Perform !ResetQueue(controller).
ResetQueue(aController);
// Step 7. Set controller.[[closeRequested]] and controller.[[started]] to
// false.
aController->SetCloseRequested(false);
aController->SetStarted(false);
// Step 8. Set controller.[[strategyHWM]] to highWaterMark.
aController->SetStrategyHWM(aHighWaterMark);
// Step 9. Set controller.[[pullAlgorithm]] to pullAlgorithm.
aController->SetPullAlgorithm(aPullAlgorithm);
// Step 10. Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
aController->SetCancelAlgorithm(aCancelAlgorithm);
// Step 11. Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
aController->SetAutoAllocateChunkSize(aAutoAllocateChunkSize);
// Step 12. Set controller.[[pendingPullIntos]] to a new empty list.
aController->PendingPullIntos().clear();
// Step 13. Set stream.[[controller]] to controller.
aStream->SetController(aController);
// Step 14. Let startResult be the result of performing startAlgorithm.
// Default algorithm returns undefined.
JS::RootedValue startResult(aCx, JS::UndefinedValue());
if (aStartAlgorithm) {
// Strong Refs:
RefPtr<UnderlyingSourceStartCallbackHelper> startAlgorithm(aStartAlgorithm);
RefPtr<ReadableStreamController> controller(aController);
startAlgorithm->StartCallback(aCx, *controller, &startResult, aRv);
if (aRv.Failed()) {
return;
}
}
// Let startPromise be a promise resolved with startResult.
RefPtr<Promise> startPromise = Promise::Create(GetIncumbentGlobal(), aRv);
if (aRv.Failed()) {
return;
}
startPromise->MaybeResolve(startResult);
// Step 16+17
startPromise->AppendNativeHandler(
new ByteStreamStartPromiseNativeHandler(aController));
}
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -91,6 +91,7 @@ class ReadableByteStreamController final : public ReadableStreamController,
void ClearPendingPullIntos();
ReadableStream* Stream() const { return mStream; }
void SetStream(ReadableStream* aStream) { mStream = aStream; }
double QueueTotalSize() const { return mQueueTotalSize; }
void SetQueueTotalSize(double aQueueTotalSize) {
@ -348,6 +349,31 @@ extern void ReadableByteStreamControllerPullInto(
JS::HandleObject aView, ReadIntoRequest* aReadIntoRequest,
ErrorResult& aRv);
extern void ReadableByteStreamControllerError(
ReadableByteStreamController* aController, JS::HandleValue aValue,
ErrorResult& aRv);
extern void ReadableByteStreamControllerEnqueue(
JSContext* aCx, ReadableByteStreamController* aController,
JS::HandleObject aChunk, ErrorResult& aRv);
extern already_AddRefed<ReadableStreamBYOBRequest>
ReadableByteStreamControllerGetBYOBRequest(
JSContext* aCx, ReadableByteStreamController* aController);
extern void ReadableByteStreamControllerClose(
JSContext* aCx, ReadableByteStreamController* aController,
ErrorResult& aRv);
extern void SetUpReadableByteStreamController(
JSContext* aCx, ReadableStream* aStream,
ReadableByteStreamController* aController,
UnderlyingSourceStartCallbackHelper* aStartAlgorithm,
UnderlyingSourcePullCallbackHelper* aPullAlgorithm,
UnderlyingSourceCancelCallbackHelper* aCancelAlgorithm,
double aHighWaterMark, Maybe<uint64_t> aAutoAllocateChunkSize,
ErrorResult& aRv);
} // namespace mozilla::dom
#endif

Просмотреть файл

@ -6,6 +6,7 @@
#include "mozilla/dom/ReadableStream.h"
#include "js/Array.h"
#include "js/Exception.h"
#include "js/PropertyAndElement.h"
#include "js/TypeDecls.h"
#include "js/Value.h"
@ -16,13 +17,16 @@
#include "mozilla/FloatingPoint.h"
#include "mozilla/HoldDropJSObjects.h"
#include "mozilla/dom/BindingCallContext.h"
#include "mozilla/dom/ByteStreamHelpers.h"
#include "mozilla/dom/ModuleMapKey.h"
#include "mozilla/dom/QueueWithSizes.h"
#include "mozilla/dom/QueuingStrategyBinding.h"
#include "mozilla/dom/ReadIntoRequest.h"
#include "mozilla/dom/ReadRequest.h"
#include "mozilla/dom/ReadableByteStreamController.h"
#include "mozilla/dom/ReadableStreamBYOBReader.h"
#include "mozilla/dom/ReadableStreamBinding.h"
#include "mozilla/dom/ReadableStreamController.h"
#include "mozilla/dom/ReadableStreamDefaultController.h"
#include "mozilla/dom/ReadableStreamDefaultReader.h"
#include "mozilla/dom/ReadableStreamTee.h"
@ -30,6 +34,7 @@
#include "mozilla/dom/StreamUtils.h"
#include "mozilla/dom/TeeState.h"
#include "mozilla/dom/UnderlyingSourceBinding.h"
#include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
#include "nsCOMPtr.h"
#include "mozilla/dom/Promise-inl.h"
@ -341,7 +346,8 @@ already_AddRefed<Promise> ReadableStreamCancel(JSContext* aCx,
// Step 6.
if (reader && reader->IsBYOB()) {
for (auto* readIntoRequest : reader->AsBYOB()->ReadIntoRequests()) {
for (RefPtr<ReadIntoRequest> readIntoRequest :
reader->AsBYOB()->ReadIntoRequests()) {
readIntoRequest->CloseSteps(aCx, JS::UndefinedHandleValue, aRv);
if (aRv.Failed()) {
return nullptr;
@ -777,7 +783,7 @@ static void ReadableStreamTee(JSContext* aCx, ReadableStream* aStream,
// Step 2. Implicit.
// Step 3.
if (aStream->Controller()->IsByte()) {
aRv.ThrowTypeError("Cannot yet tee a byte stream controller");
ReadableByteStreamTee(aCx, aStream, aResult, aRv);
return;
}
// Step 4.
@ -806,5 +812,35 @@ void ReadableStreamAddReadIntoRequest(ReadableStream* aStream,
aReadIntoRequest);
}
// https://streams.spec.whatwg.org/#abstract-opdef-createreadablebytestream
MOZ_CAN_RUN_SCRIPT
already_AddRefed<ReadableStream> CreateReadableByteStream(
JSContext* aCx, nsIGlobalObject* aGlobal,
UnderlyingSourceStartCallbackHelper* aStartAlgorithm,
UnderlyingSourcePullCallbackHelper* aPullAlgorithm,
UnderlyingSourceCancelCallbackHelper* aCancelAlgorithm, ErrorResult& aRv) {
// Step 1. Let stream be a new ReadableStream.
RefPtr<ReadableStream> stream = new ReadableStream(aGlobal);
// Perform ! InitializeReadableStream(stream).
InitializeReadableStream(stream);
// Let controller be a new ReadableByteStreamController.
RefPtr<ReadableByteStreamController> controller =
new ReadableByteStreamController(aGlobal);
// Perform ? SetUpReadableByteStreamController(stream, controller,
// startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined).
SetUpReadableByteStreamController(aCx, stream, controller, aStartAlgorithm,
aPullAlgorithm, aCancelAlgorithm, 0,
mozilla::Nothing(), aRv);
if (aRv.Failed()) {
return nullptr;
}
// Return stream.
return stream.forget();
}
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -156,6 +156,12 @@ inline bool ReadableStreamHasBYOBReader(ReadableStream* aStream) {
// https://streams.spec.whatwg.org/#readable-stream-has-default-reader
extern bool ReadableStreamHasDefaultReader(ReadableStream* aStream);
extern already_AddRefed<ReadableStream> CreateReadableByteStream(
JSContext* aCx, nsIGlobalObject* aGlobal,
UnderlyingSourceStartCallbackHelper* aStartAlgorithm,
UnderlyingSourcePullCallbackHelper* aPullAlgorithm,
UnderlyingSourceCancelCallbackHelper* aCancelAlgorithm, ErrorResult& aRv);
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -275,5 +275,22 @@ void ReadableStreamBYOBReader::ReleaseLock(ErrorResult& aRv) {
ReadableStreamReaderGenericRelease(this, aRv);
}
// https://streams.spec.whatwg.org/#acquire-readable-stream-byob-reader
already_AddRefed<ReadableStreamBYOBReader> AcquireReadableStreamBYOBReader(
JSContext* aCx, ReadableStream* aStream, ErrorResult& aRv) {
// Step 1. Let reader be a new ReadableStreamBYOBReader.
RefPtr<ReadableStreamBYOBReader> reader =
new ReadableStreamBYOBReader(aStream->GetParentObject());
// Step 2. Perform ? SetUpReadableStreamBYOBReader(reader, stream).
SetUpReadableStreamBYOBReader(aCx, reader, *aStream, aRv);
if (aRv.Failed()) {
return nullptr;
}
// Step 3. Return reader.
return reader.forget();
}
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -72,6 +72,15 @@ class ReadableStreamBYOBReader final : public ReadableStreamGenericReader,
LinkedList<RefPtr<ReadIntoRequest>> mReadIntoRequests = {};
};
already_AddRefed<ReadableStreamBYOBReader> AcquireReadableStreamBYOBReader(
JSContext* aCx, ReadableStream* aStream, ErrorResult& aRv);
void ReadableStreamBYOBReaderRead(JSContext* aCx,
ReadableStreamBYOBReader* aReader,
JS::HandleObject aView,
ReadIntoRequest* aReadIntoRequest,
ErrorResult& aRv);
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -4,9 +4,21 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "ReadableStream.h"
#include "js/Exception.h"
#include "js/TypeDecls.h"
#include "js/experimental/TypedData.h"
#include "mozilla/dom/ByteStreamHelpers.h"
#include "mozilla/dom/PromiseNativeHandler.h"
#include "mozilla/dom/ReadIntoRequest.h"
#include "mozilla/dom/ReadableStreamBYOBReader.h"
#include "mozilla/dom/ReadableStreamDefaultController.h"
#include "mozilla/dom/ReadableStreamGenericReader.h"
#include "mozilla/dom/ReadableStreamTee.h"
#include "mozilla/dom/ReadableStreamDefaultReader.h"
#include "mozilla/dom/ReadableByteStreamController.h"
#include "mozilla/dom/UnderlyingSourceBinding.h"
#include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
#include "nsCycleCollectionParticipant.h"
#include "mozilla/CycleCollectedJSContext.h"
@ -203,4 +215,817 @@ void ReadableStreamDefaultTeeReadRequest::ErrorSteps(
mTeeState->SetReading(false);
}
void PullWithDefaultReader(JSContext* aCx, TeeState* aTeeState,
ErrorResult& aRv);
void PullWithBYOBReader(JSContext* aCx, TeeState* aTeeState,
JS::HandleObject aView, bool aForBranch2,
ErrorResult& aRv);
// Algorithm described in
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee, Steps
// 17 and Steps 18, genericized across branch numbers:
//
// Note: As specified this algorithm always returns a promise resolved with
// undefined, however as some places immediately discard said promise, we
// provide this version which doesn't return a promise.
//
// NativeByteStreamTeePullAlgorithm, which implements
// UnderlyingSourcePullCallbackHelper is the version which provies the return
// promise.
void ByteStreamTeePullAlgorithm(JSContext* aCx, size_t index,
TeeState* aTeeState, ErrorResult& aRv) {
MOZ_ASSERT(index == 1 || index == 2);
// Step {17,18}.1: If reading is true,
if (aTeeState->Reading()) {
// Step {17,18}.1.1: Set readAgainForBranch1 to true.
aTeeState->SetReadAgainForBranch(1, true);
// Step {17,18}.1.1: Return a promise resolved with undefined.
return;
}
// Step {17,18}.2: Set reading to true.
aTeeState->SetReading(true);
// Step {17,18}.3: Let byobRequest be
// !ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]]).
RefPtr<ReadableStreamBYOBRequest> byobRequest =
ReadableByteStreamControllerGetBYOBRequest(
aCx, aTeeState->Branch(index)->Controller()->AsByte());
// Step {17,18}.4: If byobRequest is null, perform pullWithDefaultReader.
if (!byobRequest) {
PullWithDefaultReader(aCx, aTeeState, aRv);
} else {
// Step {17,18}.5: Otherwise, perform pullWithBYOBReader, given
// byobRequest.[[view]] and false.
JS::RootedObject view(aCx, byobRequest->View());
PullWithBYOBReader(aCx, aTeeState, view, false, aRv);
}
// Step {17,18}.6: Return a promise resolved with undefined.
return;
}
class NativeByteStreamTeePullAlgorithm final
: public UnderlyingSourcePullCallbackHelper {
RefPtr<TeeState> mTeeState;
size_t mBranchIndex;
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(NativeByteStreamTeePullAlgorithm,
UnderlyingSourcePullCallbackHelper)
explicit NativeByteStreamTeePullAlgorithm(TeeState* aTeeState,
size_t aBranchIndex)
: mTeeState(aTeeState), mBranchIndex(aBranchIndex) {
MOZ_ASSERT(aBranchIndex == 1 || aBranchIndex == 2);
}
MOZ_CAN_RUN_SCRIPT
virtual already_AddRefed<Promise> PullCallback(
JSContext* aCx, ReadableStreamController& aController,
ErrorResult& aRv) override {
RefPtr<Promise> returnPromise = Promise::CreateResolvedWithUndefined(
mTeeState->GetStream()->GetParentObject(), aRv);
if (aRv.Failed()) {
return nullptr;
}
ByteStreamTeePullAlgorithm(aCx, mBranchIndex, mTeeState, aRv);
return returnPromise.forget();
}
protected:
~NativeByteStreamTeePullAlgorithm() = default;
};
NS_IMPL_CYCLE_COLLECTION_CLASS(NativeByteStreamTeePullAlgorithm)
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(
NativeByteStreamTeePullAlgorithm, UnderlyingSourcePullCallbackHelper)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mTeeState)
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(
NativeByteStreamTeePullAlgorithm, UnderlyingSourcePullCallbackHelper)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mTeeState)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
NS_IMPL_ADDREF_INHERITED(NativeByteStreamTeePullAlgorithm,
UnderlyingSourcePullCallbackHelper)
NS_IMPL_RELEASE_INHERITED(NativeByteStreamTeePullAlgorithm,
UnderlyingSourcePullCallbackHelper)
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(NativeByteStreamTeePullAlgorithm)
NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourcePullCallbackHelper)
struct PullWithDefaultReaderReadRequest final : public ReadRequest {
RefPtr<TeeState> mTeeState;
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PullWithDefaultReaderReadRequest,
ReadRequest)
explicit PullWithDefaultReaderReadRequest(TeeState* aTeeState)
: mTeeState(aTeeState) {}
void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
ErrorResult& aRv) override {
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
// Step 15.2.1
class PullWithDefaultReaderChunkStepMicrotask : public MicroTaskRunnable {
RefPtr<TeeState> mTeeState;
JS::PersistentRooted<JSObject*> mChunk;
public:
PullWithDefaultReaderChunkStepMicrotask(JSContext* aCx,
TeeState* aTeeState,
JS::Handle<JSObject*> aChunk)
: mTeeState(aTeeState), mChunk(aCx, aChunk) {}
MOZ_CAN_RUN_SCRIPT
void Run(AutoSlowOperation& aAso) override {
// Step Numbering in this function is relative to the Queue a microtask
// of the Chunk steps of 15.2.1 of
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
AutoJSAPI jsapi;
if (NS_WARN_IF(
!jsapi.Init(mTeeState->GetStream()->GetParentObject()))) {
return;
}
JSContext* cx = jsapi.cx();
// Step 1.
mTeeState->SetReadAgainForBranch1(false);
// Step 2.
mTeeState->SetReadAgainForBranch2(false);
// Step 3.
JS::RootedObject chunk1(cx, mChunk);
JS::RootedObject chunk2(cx, mChunk);
// Step 4.
ErrorResult rv;
if (!mTeeState->Canceled1() && !mTeeState->Canceled2()) {
// Step 4.1
JS::RootedObject cloneResult(cx, CloneAsUint8Array(cx, mChunk));
// Step 4.2
if (!cloneResult) {
// Step 4.2.1
JS::RootedValue exceptionValue(cx);
if (!JS_GetPendingException(cx, &exceptionValue)) {
// Uncatchable exception, simply return.
return;
}
ErrorResult rv;
ReadableByteStreamControllerError(
mTeeState->Branch1()->Controller()->AsByte(), exceptionValue,
rv);
if (rv.MaybeSetPendingException(
cx, "Error during ReadableByteStreamControllerError")) {
return;
}
// Step 4.2.2
ReadableByteStreamControllerError(
mTeeState->Branch2()->Controller()->AsByte(), exceptionValue,
rv);
if (rv.MaybeSetPendingException(
cx, "Error during ReadableByteStreamControllerError")) {
return;
}
// Step 4.2.3
RefPtr<ReadableStream> stream(mTeeState->GetStream());
RefPtr<Promise> promise =
ReadableStreamCancel(cx, stream, exceptionValue, rv);
if (rv.MaybeSetPendingException(
cx, "Error during ReadableByteStreamControllerError")) {
return;
}
mTeeState->CancelPromise()->MaybeResolve(promise);
// Step 4.2.4
return;
}
// Step 4.3
chunk2 = cloneResult;
}
// Step 5.
if (!mTeeState->Canceled1()) {
ErrorResult rv;
RefPtr<ReadableByteStreamController> controller(
mTeeState->Branch1()->Controller()->AsByte());
ReadableByteStreamControllerEnqueue(cx, controller, chunk1, rv);
if (rv.MaybeSetPendingException(
cx, "Error during ReadableByteStreamControllerEnqueue")) {
return;
}
}
// Step 6.
if (!mTeeState->Canceled2()) {
ErrorResult rv;
RefPtr<ReadableByteStreamController> controller(
mTeeState->Branch2()->Controller()->AsByte());
ReadableByteStreamControllerEnqueue(cx, controller, chunk2, rv);
if (rv.MaybeSetPendingException(
cx, "Error during ReadableByteStreamControllerEnqueue")) {
return;
}
}
// Step 7.
mTeeState->SetReading(false);
// Step 8.
if (mTeeState->ReadAgainForBranch1()) {
ByteStreamTeePullAlgorithm(cx, 1, mTeeState, rv);
}
}
bool Suppressed() override {
nsIGlobalObject* global = mTeeState->GetStream()->GetParentObject();
return global && global->IsInSyncOperation();
}
};
MOZ_ASSERT(aChunk.isObjectOrNull());
MOZ_ASSERT(aChunk.toObjectOrNull() != nullptr);
JS::RootedObject chunk(aCx, &aChunk.toObject());
RefPtr<PullWithDefaultReaderChunkStepMicrotask> task =
MakeRefPtr<PullWithDefaultReaderChunkStepMicrotask>(aCx, mTeeState,
chunk);
CycleCollectedJSContext::Get()->DispatchToMicroTask(task.forget());
}
void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {}
void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> e,
ErrorResult& aRv) override {}
protected:
virtual ~PullWithDefaultReaderReadRequest() = default;
};
NS_IMPL_CYCLE_COLLECTION_INHERITED(PullWithDefaultReaderReadRequest,
ReadRequest, mTeeState)
NS_IMPL_ADDREF_INHERITED(PullWithDefaultReaderReadRequest, ReadRequest)
NS_IMPL_RELEASE_INHERITED(PullWithDefaultReaderReadRequest, ReadRequest)
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PullWithDefaultReaderReadRequest)
NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee:
// Step 15.
MOZ_CAN_RUN_SCRIPT
void PullWithDefaultReader(JSContext* aCx, TeeState* aTeeState,
ErrorResult& aRv) {
// Step 15.1: Not implemented until BYOB Readers are implemented.
MOZ_ASSERT(!aTeeState->GetReader()->IsBYOB());
// Step 15.2
RefPtr<ReadRequest> readRequest =
new PullWithDefaultReaderReadRequest(aTeeState);
// Step 15.3
RefPtr<ReadableStreamGenericReader> reader = aTeeState->GetReader();
ReadableStreamDefaultReaderRead(aCx, reader, readRequest, aRv);
}
void ForwardReaderError(TeeState* aTeeState,
ReadableStreamGenericReader* aThisReader);
class PullWithBYOBReader_ReadIntoRequest final : public ReadIntoRequest {
RefPtr<TeeState> mTeeState;
bool mForBranch2;
virtual ~PullWithBYOBReader_ReadIntoRequest() = default;
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PullWithBYOBReader_ReadIntoRequest,
ReadIntoRequest)
explicit PullWithBYOBReader_ReadIntoRequest(TeeState* aTeeState,
bool aForBranch2)
: mTeeState(aTeeState), mForBranch2(aForBranch2) {}
void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
ErrorResult& aRv) override {
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
// Step 16.4 chunk steps, Step 1.
class PullWithBYOBReaderChunkMicrotask : public MicroTaskRunnable {
RefPtr<TeeState> mTeeState;
JS::PersistentRooted<JSObject*> mChunk;
bool mForBranch2 = false;
public:
PullWithBYOBReaderChunkMicrotask(JSContext* aCx, TeeState* aTeeState,
JS::Handle<JSObject*> aChunk,
bool aForBranch2)
: mTeeState(aTeeState),
mChunk(aCx, aChunk),
mForBranch2(aForBranch2) {}
MOZ_CAN_RUN_SCRIPT
void Run(AutoSlowOperation& aAso) override {
AutoJSAPI jsapi;
if (NS_WARN_IF(
!jsapi.Init(mTeeState->GetStream()->GetParentObject()))) {
return;
}
JSContext* cx = jsapi.cx();
ErrorResult rv;
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
//
// Step Numbering below is relative to Chunk steps Microtask at
// Step 16.4 chunk steps, Step 1.
// Step 1.
mTeeState->SetReadAgainForBranch1(false);
// Step 2.
mTeeState->SetReadAgainForBranch2(false);
// Step 3.
bool byobCanceled =
mForBranch2 ? mTeeState->Canceled2() : mTeeState->Canceled1();
// Step 4.
bool otherCanceled =
!mForBranch2 ? mTeeState->Canceled2() : mTeeState->Canceled1();
// Rather than store byobBranch / otherBranch, we re-derive the pointers
// below, as borrowed from steps 16.2/16.3
ReadableStream* byobBranch =
mForBranch2 ? mTeeState->Branch2() : mTeeState->Branch1();
ReadableStream* otherBranch =
!mForBranch2 ? mTeeState->Branch2() : mTeeState->Branch1();
// Step 5.
if (!otherCanceled) {
// Step 5.1 (using the name clonedChunk because we don't want to name
// the completion record explicitly)
JS::RootedObject clonedChunk(cx, CloneAsUint8Array(cx, mChunk));
// Step 5.2. If cloneResult is an abrupt completion,
if (!clonedChunk) {
JS::RootedValue exception(cx);
if (!JS_GetPendingException(cx, &exception)) {
// Uncatchable exception. Return with pending
// exception still on context.
return;
}
// It's not expliclitly stated, but I assume the intention here is
// that we perform a normal completion here, so we clear the
// exception.
JS_ClearPendingException(cx);
// Step 5.2.1
ReadableByteStreamControllerError(
byobBranch->Controller()->AsByte(), exception, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
// Step 5.2.2.
ReadableByteStreamControllerError(
otherBranch->Controller()->AsByte(), exception, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
// Step 5.2.3.
RefPtr<ReadableStream> stream = mTeeState->GetStream();
RefPtr<Promise> cancelPromise =
ReadableStreamCancel(cx, stream, exception, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
mTeeState->CancelPromise()->MaybeResolve(cancelPromise);
// Step 5.2.4.
return;
}
// Step 5.3 (implicitly handled above by name selection)
// Step 5.4.
if (!byobCanceled) {
RefPtr<ReadableByteStreamController> controller(
byobBranch->Controller()->AsByte());
ReadableByteStreamControllerRespondWithNewView(cx, controller,
mChunk, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
}
// Step 5.4.
RefPtr<ReadableByteStreamController> otherController =
otherBranch->Controller()->AsByte();
ReadableByteStreamControllerEnqueue(cx, otherController, clonedChunk,
rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
// Step 6.
} else if (!byobCanceled) {
RefPtr<ReadableByteStreamController> byobController =
byobBranch->Controller()->AsByte();
ReadableByteStreamControllerRespondWithNewView(cx, byobController,
mChunk, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
}
// Step 7.
mTeeState->SetReading(false);
// Step 8.
if (mTeeState->ReadAgainForBranch1()) {
ByteStreamTeePullAlgorithm(cx, 1, mTeeState, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
} else if (mTeeState->ReadAgainForBranch2()) {
ByteStreamTeePullAlgorithm(cx, 1, mTeeState, rv);
if (rv.MaybeSetPendingException(cx)) {
return;
}
}
}
bool Suppressed() override {
nsIGlobalObject* global = mTeeState->GetStream()->GetParentObject();
return global && global->IsInSyncOperation();
}
};
MOZ_ASSERT(aChunk.isObjectOrNull());
MOZ_ASSERT(aChunk.toObjectOrNull());
JS::RootedObject chunk(aCx, aChunk.toObjectOrNull());
RefPtr<PullWithBYOBReaderChunkMicrotask> task =
MakeRefPtr<PullWithBYOBReaderChunkMicrotask>(aCx, mTeeState, chunk,
mForBranch2);
CycleCollectedJSContext::Get()->DispatchToMicroTask(task.forget());
}
MOZ_CAN_RUN_SCRIPT
void CloseSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
ErrorResult& aRv) override {
// Step 1.
mTeeState->SetReading(false);
// Step 2.
bool byobCanceled =
mForBranch2 ? mTeeState->Canceled2() : mTeeState->Canceled1();
// Step 3.
bool otherCanceled =
!mForBranch2 ? mTeeState->Canceled2() : mTeeState->Canceled1();
// Rather than store byobBranch / otherBranch, we re-derive the pointers
// below, as borrowed from steps 16.2/16.3
ReadableStream* byobBranch =
mForBranch2 ? mTeeState->Branch2() : mTeeState->Branch1();
ReadableStream* otherBranch =
!mForBranch2 ? mTeeState->Branch2() : mTeeState->Branch1();
// Step 4.
if (!byobCanceled) {
ReadableByteStreamControllerClose(aCx, byobBranch->Controller()->AsByte(),
aRv);
if (aRv.Failed()) {
return;
}
}
// Step 5.
if (!otherCanceled) {
ReadableByteStreamControllerClose(
aCx, otherBranch->Controller()->AsByte(), aRv);
if (aRv.Failed()) {
return;
}
}
// Step 6.
if (!aChunk.isUndefined()) {
MOZ_ASSERT(aChunk.isObject());
MOZ_ASSERT(aChunk.toObjectOrNull());
JS::RootedObject chunkObject(aCx, &aChunk.toObject());
MOZ_ASSERT(JS_IsArrayBufferViewObject(chunkObject));
// Step 6.1.
MOZ_ASSERT(JS_GetArrayBufferViewByteLength(chunkObject) == 0);
// Step 6.2.
if (!byobCanceled) {
RefPtr<ReadableByteStreamController> byobController(
byobBranch->Controller()->AsByte());
ReadableByteStreamControllerRespondWithNewView(aCx, byobController,
chunkObject, aRv);
if (aRv.Failed()) {
return;
}
}
// Step 6.3
if (!otherCanceled &&
!otherBranch->Controller()->AsByte()->PendingPullIntos().isEmpty()) {
RefPtr<ReadableByteStreamController> otherController(
otherBranch->Controller()->AsByte());
ReadableByteStreamControllerRespond(aCx, otherController, 0, aRv);
if (aRv.Failed()) {
return;
}
}
}
// Step 7.
if (!byobCanceled || !otherCanceled) {
mTeeState->CancelPromise()->MaybeResolveWithUndefined();
}
}
void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> e,
ErrorResult& errorResult) override {
// Step 1.
mTeeState->SetReading(false);
}
};
NS_IMPL_CYCLE_COLLECTION_INHERITED(PullWithBYOBReader_ReadIntoRequest,
ReadIntoRequest, mTeeState)
NS_IMPL_ADDREF_INHERITED(PullWithBYOBReader_ReadIntoRequest, ReadIntoRequest)
NS_IMPL_RELEASE_INHERITED(PullWithBYOBReader_ReadIntoRequest, ReadIntoRequest)
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PullWithBYOBReader_ReadIntoRequest)
NS_INTERFACE_MAP_END_INHERITING(ReadIntoRequest)
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
// Step 16.
MOZ_CAN_RUN_SCRIPT
void PullWithBYOBReader(JSContext* aCx, TeeState* aTeeState,
JS::HandleObject aView, bool aForBranch2,
ErrorResult& aRv) {
// Step 16.1
if (aTeeState->GetReader()->IsDefault()) {
// Step 16.1.1
MOZ_ASSERT(aTeeState->GetDefaultReader()->ReadRequests().isEmpty());
// Step 16.1.2. Perform !ReadableStreamReaderGenericRelease(reader).
ReadableStreamReaderGenericRelease(aTeeState->GetReader(), aRv);
if (aRv.Failed()) {
return;
}
// Step 16.1.3. Set reader to !AcquireReadableStreamBYOBReader(stream).
RefPtr<ReadableStreamBYOBReader> reader =
AcquireReadableStreamBYOBReader(aCx, aTeeState->GetStream(), aRv);
if (aRv.Failed()) {
return;
}
aTeeState->SetReader(reader);
// Step 16.1.4. Perform forwardReaderError, given reader.
ForwardReaderError(aTeeState, reader);
}
// Step 16.2. Unused in this function, moved to consumers.
// Step 16.3. Unused in this function, moved to consumers.
// Step 16.4.
RefPtr<ReadIntoRequest> readIntoRequest =
new PullWithBYOBReader_ReadIntoRequest(aTeeState, aForBranch2);
// Step 16.5.
RefPtr<ReadableStreamBYOBReader> byobReader =
aTeeState->GetReader()->AsBYOB();
ReadableStreamBYOBReaderRead(aCx, byobReader, aView, readIntoRequest, aRv);
}
class ForwardReaderErrorPromiseHandler final : public PromiseNativeHandler {
~ForwardReaderErrorPromiseHandler() = default;
RefPtr<TeeState> mTeeState;
RefPtr<ReadableStreamGenericReader> mReader;
public:
NS_DECL_CYCLE_COLLECTING_ISUPPORTS
NS_DECL_CYCLE_COLLECTION_CLASS(ForwardReaderErrorPromiseHandler)
ForwardReaderErrorPromiseHandler(TeeState* aTeeState,
ReadableStreamGenericReader* aReader)
: mTeeState(aTeeState), mReader(aReader) {}
MOZ_CAN_RUN_SCRIPT
void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue) override {
}
MOZ_CAN_RUN_SCRIPT
virtual void RejectedCallback(JSContext* aCx,
JS::Handle<JS::Value> aValue) override {
// Step 14.1.1
if (mTeeState->GetReader() != mReader) {
return;
}
ErrorResult rv;
// Step 14.1.2: Perform
// !ReadableByteStreamControllerError(branch1.[[controller]], r).
MOZ_ASSERT(mTeeState->Branch1()->Controller()->IsByte());
ReadableByteStreamControllerError(
mTeeState->Branch1()->Controller()->AsByte(), aValue, rv);
if (rv.MaybeSetPendingException(
aCx, "ReadableByteStreamTee: Error during forwardReaderError")) {
return;
}
// Step 14.1.3: Perform
// !ReadableByteStreamControllerError(branch2.[[controller]], r).
MOZ_ASSERT(mTeeState->Branch2()->Controller()->IsByte());
ReadableByteStreamControllerError(
mTeeState->Branch2()->Controller()->AsByte(), aValue, rv);
if (rv.MaybeSetPendingException(
aCx, "ReadableByteStreamTee: Error during forwardReaderError")) {
return;
}
// Step 14.1.4: If canceled1 is false or canceled2 is false, resolve
// cancelPromise with undefined.
if (!mTeeState->Canceled1() || !mTeeState->Canceled2()) {
mTeeState->CancelPromise()->MaybeResolveWithUndefined();
}
}
};
NS_IMPL_CYCLE_COLLECTION(ForwardReaderErrorPromiseHandler, mTeeState, mReader)
NS_IMPL_CYCLE_COLLECTING_ADDREF(ForwardReaderErrorPromiseHandler)
NS_IMPL_CYCLE_COLLECTING_RELEASE(ForwardReaderErrorPromiseHandler)
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ForwardReaderErrorPromiseHandler)
NS_INTERFACE_MAP_ENTRY(nsISupports)
NS_INTERFACE_MAP_END
// See https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
// Step 14.
void ForwardReaderError(TeeState* aTeeState,
ReadableStreamGenericReader* aThisReader) {
aThisReader->ClosedPromise()->AppendNativeHandler(
new ForwardReaderErrorPromiseHandler(aTeeState, aThisReader));
}
class ReadableByteStreamTeeCancelAlgorithm final
: public UnderlyingSourceCancelCallbackHelper {
RefPtr<TeeState> mTeeState;
size_t mStreamIndex;
size_t otherStream() {
if (mStreamIndex == 1) {
return 2;
}
MOZ_ASSERT(mStreamIndex == 2);
return 1;
}
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(ReadableByteStreamTeeCancelAlgorithm,
UnderlyingSourceCancelCallbackHelper)
explicit ReadableByteStreamTeeCancelAlgorithm(TeeState* aTeeState,
size_t aStreamIndex)
: mTeeState(aTeeState), mStreamIndex(aStreamIndex) {}
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
// Steps 19 and 20 both use this class.
MOZ_CAN_RUN_SCRIPT
virtual already_AddRefed<Promise> CancelCallback(
JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
ErrorResult& aRv) override {
// Step 1.
mTeeState->SetCanceled(mStreamIndex, true);
// Step 2.
mTeeState->SetReason(mStreamIndex, aReason.Value());
// Step 3.
if (mTeeState->Canceled(otherStream())) {
// Step 3.1
JS::RootedObject compositeReason(aCx, JS::NewArrayObject(aCx, 2));
if (!compositeReason) {
aRv.StealExceptionFromJSContext(aCx);
return nullptr;
}
JS::RootedValue reason1(aCx, mTeeState->Reason1());
if (!JS_SetElement(aCx, compositeReason, 0, reason1)) {
aRv.StealExceptionFromJSContext(aCx);
return nullptr;
}
JS::RootedValue reason2(aCx, mTeeState->Reason2());
if (!JS_SetElement(aCx, compositeReason, 1, reason2)) {
aRv.StealExceptionFromJSContext(aCx);
return nullptr;
}
// Step 3.2
JS::RootedValue compositeReasonValue(aCx,
JS::ObjectValue(*compositeReason));
RefPtr<ReadableStream> stream(mTeeState->GetStream());
RefPtr<Promise> cancelResult =
ReadableStreamCancel(aCx, stream, compositeReasonValue, aRv);
if (aRv.Failed()) {
return nullptr;
}
// Step 3.3
mTeeState->CancelPromise()->MaybeResolve(cancelResult);
}
// Step 4.
return do_AddRef(mTeeState->CancelPromise());
}
protected:
~ReadableByteStreamTeeCancelAlgorithm() = default;
};
NS_IMPL_CYCLE_COLLECTION_CLASS(ReadableByteStreamTeeCancelAlgorithm)
NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED(
ReadableByteStreamTeeCancelAlgorithm, UnderlyingSourceCancelCallbackHelper)
NS_IMPL_CYCLE_COLLECTION_UNLINK(mTeeState)
NS_IMPL_CYCLE_COLLECTION_UNLINK_END
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED(
ReadableByteStreamTeeCancelAlgorithm, UnderlyingSourceCancelCallbackHelper)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mTeeState)
NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
NS_IMPL_ADDREF_INHERITED(ReadableByteStreamTeeCancelAlgorithm,
UnderlyingSourceCancelCallbackHelper)
NS_IMPL_RELEASE_INHERITED(ReadableByteStreamTeeCancelAlgorithm,
UnderlyingSourceCancelCallbackHelper)
NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableByteStreamTeeCancelAlgorithm)
NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceCancelCallbackHelper)
// https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee
MOZ_CAN_RUN_SCRIPT
void ReadableByteStreamTee(JSContext* aCx, ReadableStream* aStream,
nsTArray<RefPtr<ReadableStream>>& aResult,
ErrorResult& aRv) {
// Step 1. Implicit
// Step 2.
MOZ_ASSERT(aStream->Controller()->IsByte());
// Step 3-13 captured as part of TeeState allocation
RefPtr<TeeState> teeState = TeeState::Create(aCx, aStream, false, aRv);
if (aRv.Failed()) {
return;
}
// Step 14: See ForwardReaderError
// Step 15. See PullWithDefaultReader
// Step 16. See PullWithBYOBReader
// Step 17,18. See {Native,}ByteStreamTeePullAlgorithm
// Step 19,20. See ReadableByteStreamTeeCancelAlgorithm
// Step 21. Elided because consumers know how to handle nullptr correctly.
// Step 22.
nsCOMPtr<nsIGlobalObject> global = aStream->GetParentObject();
RefPtr<UnderlyingSourcePullCallbackHelper> pull1Algorithm =
new NativeByteStreamTeePullAlgorithm(teeState, 1);
RefPtr<UnderlyingSourceCancelCallbackHelper> cancel1Algorithm =
new ReadableByteStreamTeeCancelAlgorithm(teeState, 1);
teeState->SetBranch1(CreateReadableByteStream(
aCx, global, nullptr, pull1Algorithm, cancel1Algorithm, aRv));
if (aRv.Failed()) {
return;
}
// Step 23.
RefPtr<UnderlyingSourcePullCallbackHelper> pull2Algorithm =
new NativeByteStreamTeePullAlgorithm(teeState, 2);
RefPtr<UnderlyingSourceCancelCallbackHelper> cancel2Algorithm =
new ReadableByteStreamTeeCancelAlgorithm(teeState, 2);
teeState->SetBranch2(CreateReadableByteStream(
aCx, global, nullptr, pull2Algorithm, cancel2Algorithm, aRv));
if (aRv.Failed()) {
return;
}
// Step 24.
ForwardReaderError(teeState, teeState->GetReader());
// Step 25.
aResult.AppendElement(teeState->Branch1());
aResult.AppendElement(teeState->Branch2());
}
} // namespace mozilla::dom

Просмотреть файл

@ -70,5 +70,9 @@ struct ReadableStreamDefaultTeeReadRequest final : public ReadRequest {
virtual ~ReadableStreamDefaultTeeReadRequest() = default;
};
void ReadableByteStreamTee(JSContext* aCx, ReadableStream* aStream,
nsTArray<RefPtr<ReadableStream>>& aResult,
ErrorResult& aRv);
} // namespace mozilla::dom
#endif

Просмотреть файл

@ -84,5 +84,4 @@ void TeeState::SetPullAlgorithm(
ReadableStreamDefaultTeePullAlgorithm* aPullAlgorithm) {
mPullAlgorithm = aPullAlgorithm;
}
} // namespace mozilla::dom

Просмотреть файл

@ -64,12 +64,26 @@ struct TeeState : public nsISupports {
bool Canceled2() const { return mCanceled2; }
void SetCanceled2(bool aCanceled2) { mCanceled2 = aCanceled2; }
void SetCanceled(size_t aStreamIndex, bool aCanceled) {
MOZ_ASSERT(aStreamIndex == 1 || aStreamIndex == 2);
aStreamIndex == 1 ? SetCanceled1(aCanceled) : SetCanceled2(aCanceled);
}
bool Canceled(size_t aStreamIndex) {
MOZ_ASSERT(aStreamIndex == 1 || aStreamIndex == 2);
return aStreamIndex == 1 ? Canceled1() : Canceled2();
}
JS::Value Reason1() const { return mReason1; }
void SetReason1(JS::HandleValue aReason1) { mReason1 = aReason1; }
JS::Value Reason2() const { return mReason2; }
void SetReason2(JS::HandleValue aReason2) { mReason2 = aReason2; }
void SetReason(size_t aStreamIndex, JS::HandleValue aReason) {
MOZ_ASSERT(aStreamIndex == 1 || aStreamIndex == 2);
aStreamIndex == 1 ? SetReason1(aReason) : SetReason2(aReason);
}
ReadableStream* Branch1() const { return mBranch1; }
void SetBranch1(already_AddRefed<ReadableStream> aBranch1) {
mBranch1 = aBranch1;
@ -95,6 +109,22 @@ struct TeeState : public nsISupports {
return mPullAlgorithm;
}
// Some code is better served by using an index into various internal slots to
// avoid duplication: Here we provide alternative accessors for that case.
ReadableStream* Branch(size_t index) const {
MOZ_ASSERT(index == 1 || index == 2);
return index == 1 ? Branch1() : Branch2();
}
void SetReadAgainForBranch(size_t index, bool value) {
MOZ_ASSERT(index == 1 || index == 2);
if (index == 1) {
SetReadAgainForBranch1(value);
return;
}
SetReadAgainForBranch2(value);
}
private:
TeeState(JSContext* aCx, ReadableStream* aStream, bool aCloneForBranch2);