Bug 1815997: Add InputToReadableStream helpers r=saschanaz

Differential Revision: https://phabricator.services.mozilla.com/D169415
This commit is contained in:
Randell Jesup 2023-02-23 17:12:25 +00:00
Родитель 71f1cfb1bc
Коммит 70a879a3fb
2 изменённых файлов: 328 добавлений и 0 удалений

Просмотреть файл

@ -5,9 +5,11 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "StreamUtils.h"
#include "mozilla/dom/ReadableStream.h"
#include "mozilla/dom/ReadableStreamDefaultController.h"
#include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
#include "mozilla/dom/UnderlyingSourceBinding.h"
#include "js/experimental/TypedData.h"
namespace mozilla::dom {
@ -144,4 +146,254 @@ already_AddRefed<Promise> UnderlyingSourceAlgorithmsWrapper::CancelCallback(
aRv);
}
NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED_0(
InputToReadableStreamAlgorithms, UnderlyingSourceAlgorithmsWrapper)
NS_IMPL_CYCLE_COLLECTION_INHERITED(InputToReadableStreamAlgorithms,
UnderlyingSourceAlgorithmsWrapper, mStream)
already_AddRefed<Promise> InputToReadableStreamAlgorithms::PullCallbackImpl(
JSContext* aCx, ReadableStreamController& aController, ErrorResult& aRv) {
MOZ_ASSERT(aController.IsByte());
ReadableStream* stream = aController.Stream();
MOZ_ASSERT(stream);
MOZ_DIAGNOSTIC_ASSERT(stream->Disturbed());
MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eWaiting ||
mState == eChecking || mState == eReading);
RefPtr<Promise> resolvedWithUndefinedPromise =
Promise::CreateResolvedWithUndefined(aController.GetParentObject(), aRv);
if (aRv.Failed()) {
return nullptr;
}
if (mState == eReading) {
// We are already reading data.
return resolvedWithUndefinedPromise.forget();
}
if (mState == eChecking) {
// If we are looking for more data, there is nothing else we should do:
// let's move this checking operation in a reading.
MOZ_ASSERT(mInput);
mState = eReading;
return resolvedWithUndefinedPromise.forget();
}
mState = eReading;
MOZ_DIAGNOSTIC_ASSERT(mInput);
nsresult rv = mInput->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, stream, rv);
return nullptr;
}
// All good.
return resolvedWithUndefinedPromise.forget();
}
NS_IMETHODIMP
InputToReadableStreamAlgorithms::OnInputStreamReady(
nsIAsyncInputStream* aStream) {
MOZ_DIAGNOSTIC_ASSERT(aStream);
// Already closed. We have nothing else to do here.
if (mState == eClosed) {
return NS_OK;
}
AutoEntryScript aes(mStream->GetParentObject(),
"InputToReadableStream data available");
MOZ_DIAGNOSTIC_ASSERT(mInput);
MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking);
JSContext* cx = aes.cx();
uint64_t size = 0;
nsresult rv = mInput->Available(&size);
if (NS_SUCCEEDED(rv) && size == 0) {
// In theory this should not happen. If size is 0, the stream should be
// considered closed.
rv = NS_BASE_STREAM_CLOSED;
}
// No warning for stream closed.
if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(cx, mStream, rv);
return NS_OK;
}
// This extra checking is completed. Let's wait for the next read request.
if (mState == eChecking) {
mState = eWaiting;
return NS_OK;
}
mState = eWriting;
ErrorResult errorResult;
EnqueueChunkWithSizeIntoStream(cx, mStream, size, errorResult);
errorResult.WouldReportJSException();
if (errorResult.Failed()) {
ErrorPropagation(cx, mStream, errorResult.StealNSResult());
return NS_OK;
}
// The previous call can execute JS (even up to running a nested event
// loop), so |mState| can't be asserted to have any particular value, even
// if the previous call succeeds.
return NS_OK;
}
void InputToReadableStreamAlgorithms::WriteIntoReadRequestBuffer(
JSContext* aCx, ReadableStream* aStream, JS::Handle<JSObject*> aBuffer,
uint32_t aLength, uint32_t* aByteWritten) {
MOZ_DIAGNOSTIC_ASSERT(aBuffer);
MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
MOZ_DIAGNOSTIC_ASSERT(mInput);
MOZ_DIAGNOSTIC_ASSERT(mState == eWriting);
mState = eChecking;
uint32_t written;
nsresult rv;
void* buffer;
{
// Bug 1754513: Hazard suppression.
//
// Because mInput->Read is detected as possibly GCing by the
// current state of our static hazard analysis, we need to do the
// suppression here. This can be removed with future improvements
// to the static analysis.
JS::AutoSuppressGCAnalysis suppress;
JS::AutoCheckCannotGC noGC;
bool isSharedMemory;
buffer = JS_GetArrayBufferViewData(aBuffer, &isSharedMemory, noGC);
MOZ_ASSERT(!isSharedMemory);
rv = mInput->Read(static_cast<char*>(buffer), aLength, &written);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, aStream, rv);
return;
}
}
*aByteWritten = written;
if (written == 0) {
CloseAndReleaseObjects(aCx, aStream);
return;
}
rv = mInput->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, aStream, rv);
return;
}
// All good.
}
// Whenever one or more bytes are available and stream is not
// errored, enqueue a Uint8Array wrapping an ArrayBuffer containing the
// available bytes into stream.
void InputToReadableStreamAlgorithms::EnqueueChunkWithSizeIntoStream(
JSContext* aCx, ReadableStream* aStream, uint64_t aAvailableData,
ErrorResult& aRv) {
// To avoid OOMing up on huge amounts of available data on a 32 bit system,
// as well as potentially overflowing nsIInputStream's Read method's
// parameter, let's limit our maximum chunk size to 256MB.
uint32_t ableToRead =
std::min(static_cast<uint64_t>(256 * 1024 * 1024), aAvailableData);
// Create Chunk
aRv.MightThrowJSException();
JS::Rooted<JSObject*> chunk(aCx, JS_NewUint8Array(aCx, ableToRead));
if (!chunk) {
aRv.StealExceptionFromJSContext(aCx);
return;
}
{
uint32_t bytesWritten = 0;
WriteIntoReadRequestBuffer(aCx, aStream, chunk, ableToRead, &bytesWritten);
// If bytesWritten is zero, then the stream has been closed; return
// rather than enqueueing a chunk filled with zeros.
if (bytesWritten == 0) {
return;
}
// If we don't read every byte we've allocated in the Uint8Array
// we risk enqueuing a chunk that is padded with trailing zeros,
// corrupting future processing of the chunks:
MOZ_DIAGNOSTIC_ASSERT((ableToRead - bytesWritten) == 0);
}
MOZ_ASSERT(aStream->Controller()->IsByte());
JS::Rooted<JS::Value> chunkValue(aCx);
chunkValue.setObject(*chunk);
aStream->EnqueueNative(aCx, chunkValue, aRv);
}
void InputToReadableStreamAlgorithms::CloseAndReleaseObjects(
JSContext* aCx, ReadableStream* aStream) {
MOZ_DIAGNOSTIC_ASSERT(mState != eClosed);
mState = eClosed;
ReleaseObjects();
if (aStream->State() == ReadableStream::ReaderState::Readable) {
IgnoredErrorResult rv;
aStream->CloseNative(aCx, rv);
NS_WARNING_ASSERTION(!rv.Failed(), "Failed to Close Stream");
}
}
void InputToReadableStreamAlgorithms::ReleaseObjects() {
mInput->CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
void InputToReadableStreamAlgorithms::ErrorPropagation(JSContext* aCx,
ReadableStream* aStream,
nsresult aError) {
// Nothing to do.
if (mState == eClosed) {
return;
}
// Let's close the stream.
if (aError == NS_BASE_STREAM_CLOSED) {
CloseAndReleaseObjects(aCx, aStream);
return;
}
// Let's use a generic error.
ErrorResult rv;
// XXXbz can we come up with a better error message here to tell the
// consumer what went wrong?
rv.ThrowTypeError("Error in input stream");
JS::Rooted<JS::Value> errorValue(aCx);
bool ok = ToJSValue(aCx, std::move(rv), &errorValue);
MOZ_RELEASE_ASSERT(ok, "ToJSValue never fails for ErrorResult");
{
// This will be ignored if it's already errored.
IgnoredErrorResult rv;
aStream->ErrorNative(aCx, errorValue, rv);
NS_WARNING_ASSERTION(!rv.Failed(), "Failed to error InputToReadableStream");
}
MOZ_ASSERT(mInput);
CloseAndReleaseObjects(aCx, aStream);
}
} // namespace mozilla::dom

Просмотреть файл

@ -10,6 +10,7 @@
#include "mozilla/HoldDropJSObjects.h"
#include "mozilla/dom/Promise.h"
#include "mozilla/dom/UnderlyingSourceBinding.h"
#include "nsIAsyncInputStream.h"
#include "nsISupports.h"
#include "nsISupportsImpl.h"
@ -22,10 +23,13 @@
* WebIDL as if they were methods. So we have to preserve the underlying object
* to use as the This value on invocation.
*/
enum class nsresult : uint32_t;
namespace mozilla::dom {
class BodyStreamHolder;
class ReadableStreamController;
class ReadableStream;
class UnderlyingSourceAlgorithmsBase : public nsISupports {
public:
@ -155,6 +159,78 @@ class UnderlyingSourceAlgorithmsWrapper
}
};
class InputToReadableStreamAlgorithms final
: public UnderlyingSourceAlgorithmsWrapper,
public nsIInputStreamCallback {
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(InputToReadableStreamAlgorithms,
UnderlyingSourceAlgorithmsWrapper)
InputToReadableStreamAlgorithms(nsIAsyncInputStream* aInput,
ReadableStream* aStream)
: mState(eInitializing),
mOwningEventTarget(GetCurrentSerialEventTarget()),
mInput(aInput),
mStream(aStream) {}
// Streams algorithms
already_AddRefed<Promise> PullCallbackImpl(
JSContext* aCx, ReadableStreamController& aController,
ErrorResult& aRv) override;
void ReleaseObjects() override;
private:
~InputToReadableStreamAlgorithms() override = default;
MOZ_CAN_RUN_SCRIPT_BOUNDARY void CloseAndReleaseObjects(
JSContext* aCx, ReadableStream* aStream);
void WriteIntoReadRequestBuffer(JSContext* aCx, ReadableStream* aStream,
JS::Handle<JSObject*> aBuffer,
uint32_t aLength, uint32_t* aByteWritten);
MOZ_CAN_RUN_SCRIPT_BOUNDARY void EnqueueChunkWithSizeIntoStream(
JSContext* aCx, ReadableStream* aStream, uint64_t aAvailableData,
ErrorResult& aRv);
void ErrorPropagation(JSContext* aCx, ReadableStream* aStream,
nsresult aError);
// Common methods
enum State {
// This is the beginning state before any reading operation.
eInitializing,
// RequestDataCallback has not been called yet. We haven't started to read
// data from the stream yet.
eWaiting,
// We are reading data in a separate I/O thread.
eReading,
// We are ready to write something in the JS Buffer.
eWriting,
// After a writing, we want to check if the stream is closed. After the
// check, we go back to eWaiting. If a reading request happens in the
// meantime, we move to eReading state.
eChecking,
// Operation completed.
eClosed,
};
State mState;
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
nsCOMPtr<nsIAsyncInputStream> mInput;
RefPtr<ReadableStream> mStream;
};
} // namespace mozilla::dom
#endif