/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "FetchStream.h" #include "mozilla/dom/DOMException.h" #include "nsITransport.h" #include "nsIStreamTransportService.h" #include "nsProxyRelease.h" #include "WorkerPrivate.h" #include "Workers.h" #define FETCH_STREAM_FLAG 0 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); namespace mozilla { namespace dom { using namespace workers; namespace { class FetchStreamWorkerHolder final : public WorkerHolder { public: explicit FetchStreamWorkerHolder(FetchStream* aStream) : WorkerHolder("FetchStreamWorkerHolder", WorkerHolder::Behavior::AllowIdleShutdownStart) , mStream(aStream) , mWasNotified(false) {} bool Notify(Status aStatus) override { if (!mWasNotified) { mWasNotified = true; mStream->Close(); } return true; } WorkerPrivate* GetWorkerPrivate() const { return mWorkerPrivate; } private: RefPtr mStream; bool mWasNotified; }; class FetchStreamWorkerHolderShutdown final : public WorkerControlRunnable { public: FetchStreamWorkerHolderShutdown(WorkerPrivate* aWorkerPrivate, UniquePtr&& aHolder, nsCOMPtr&& aGlobal, RefPtr&& aStreamHolder) : WorkerControlRunnable(aWorkerPrivate, WorkerThreadUnchangedBusyCount) , mHolder(Move(aHolder)) , mGlobal(Move(aGlobal)) , mStreamHolder(Move(aStreamHolder)) {} bool WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override { mHolder = nullptr; mGlobal = nullptr; mStreamHolder->NullifyStream(); mStreamHolder = nullptr; return true; } // This runnable starts from a JS Thread. We need to disable a couple of // assertions overring the following methods. bool PreDispatch(WorkerPrivate* aWorkerPrivate) override { return true; } void PostDispatch(WorkerPrivate* aWorkerPrivate, bool aDispatchResult) override {} private: UniquePtr mHolder; nsCOMPtr mGlobal; RefPtr mStreamHolder; }; } // anonymous NS_IMPL_ISUPPORTS(FetchStream, nsIInputStreamCallback, nsIObserver, nsISupportsWeakReference) /* static */ void FetchStream::Create(JSContext* aCx, FetchStreamHolder* aStreamHolder, nsIGlobalObject* aGlobal, nsIInputStream* aInputStream, JS::MutableHandle aStream, ErrorResult& aRv) { MOZ_DIAGNOSTIC_ASSERT(aCx); MOZ_DIAGNOSTIC_ASSERT(aStreamHolder); MOZ_DIAGNOSTIC_ASSERT(aInputStream); RefPtr stream = new FetchStream(aGlobal, aStreamHolder, aInputStream); if (NS_IsMainThread()) { nsCOMPtr os = mozilla::services::GetObserverService(); if (NS_WARN_IF(!os)) { aRv.Throw(NS_ERROR_FAILURE); return; } aRv = os->AddObserver(stream, DOM_WINDOW_DESTROYED_TOPIC, true); if (NS_WARN_IF(aRv.Failed())) { return; } } else { WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx); MOZ_ASSERT(workerPrivate); UniquePtr holder( new FetchStreamWorkerHolder(stream)); if (NS_WARN_IF(!holder->HoldWorker(workerPrivate, Closing))) { aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR); return; } // Note, this will create a ref-cycle between the holder and the stream. // The cycle is broken when the stream is closed or the worker begins // shutting down. stream->mWorkerHolder = Move(holder); } if (!JS::HasReadableStreamCallbacks(aCx)) { JS::SetReadableStreamCallbacks(aCx, &FetchStream::RequestDataCallback, &FetchStream::WriteIntoReadRequestCallback, &FetchStream::CancelCallback, &FetchStream::ClosedCallback, &FetchStream::ErroredCallback, &FetchStream::FinalizeCallback); } JS::Rooted body(aCx, JS::NewReadableExternalSourceStreamObject(aCx, stream, FETCH_STREAM_FLAG)); if (!body) { aRv.StealExceptionFromJSContext(aCx); return; } // This will be released in FetchStream::FinalizeCallback(). We are // guaranteed the jsapi will call FinalizeCallback when ReadableStream // js object is finalized. NS_ADDREF(stream.get()); aStream.set(body); } /* static */ void FetchStream::RequestDataCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, size_t aDesiredSize) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream)); RefPtr stream = static_cast(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting || stream->mState == eChecking || stream->mState == eReading); if (stream->mState == eReading) { // We are already reading data. return; } if (stream->mState == eChecking) { // If we are looking for more data, there is nothing else we should do: // let's move this checking operation in a reading. MOZ_ASSERT(stream->mInputStream); stream->mState = eReading; return; } stream->mState = eReading; if (!stream->mInputStream) { // This is the first use of the stream. Let's convert the // mOriginalInputStream into an nsIAsyncInputStream. MOZ_ASSERT(stream->mOriginalInputStream); bool nonBlocking = false; nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } nsCOMPtr asyncStream = do_QueryInterface(stream->mOriginalInputStream); if (!nonBlocking || !asyncStream) { nsCOMPtr sts = do_GetService(kStreamTransportServiceCID, &rv); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } nsCOMPtr transport; rv = sts->CreateInputTransport(stream->mOriginalInputStream, /* aCloseWhenDone */ true, getter_AddRefs(transport)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } nsCOMPtr wrapper; rv = transport->OpenInputStream(/* aFlags */ 0, /* aSegmentSize */ 0, /* aSegmentCount */ 0, getter_AddRefs(wrapper)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } asyncStream = do_QueryInterface(wrapper); } stream->mInputStream = asyncStream; stream->mOriginalInputStream = nullptr; } MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream); nsresult rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } // All good. } /* static */ void FetchStream::WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, void* aBuffer, size_t aLength, size_t* aByteWritten) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); MOZ_DIAGNOSTIC_ASSERT(aBuffer); MOZ_DIAGNOSTIC_ASSERT(aByteWritten); RefPtr stream = static_cast(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting); stream->mState = eChecking; uint32_t written; nsresult rv = stream->mInputStream->Read(static_cast(aBuffer), aLength, &written); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } *aByteWritten = written; if (written == 0) { stream->CloseAndReleaseObjects(aCx, aStream); return; } rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } // All good. } /* static */ JS::Value FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, JS::HandleValue aReason) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This is safe because we created an extra reference in FetchStream::Create() // that won't be released until FetchStream::FinalizeCallback() is called. // We are guaranteed that won't happen until the js ReadableStream object // is finalized. FetchStream* stream = static_cast(aUnderlyingSource); if (stream->mInputStream) { stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } stream->ReleaseObjects(); return JS::UndefinedValue(); } /* static */ void FetchStream::ClosedCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); } /* static */ void FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, JS::HandleValue aReason) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This is safe because we created an extra reference in FetchStream::Create() // that won't be released until FetchStream::FinalizeCallback() is called. // We are guaranteed that won't happen until the js ReadableStream object // is finalized. FetchStream* stream = static_cast(aUnderlyingSource); if (stream->mInputStream) { stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } stream->ReleaseObjects(); } void FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This can be called in any thread. // This takes ownership of the ref created in FetchStream::Create(). RefPtr stream = dont_AddRef(static_cast(aUnderlyingSource)); stream->ReleaseObjects(); } FetchStream::FetchStream(nsIGlobalObject* aGlobal, FetchStreamHolder* aStreamHolder, nsIInputStream* aInputStream) : mState(eWaiting) , mGlobal(aGlobal) , mStreamHolder(aStreamHolder) , mOwningEventTarget(aGlobal->EventTargetFor(TaskCategory::Other)) , mOriginalInputStream(aInputStream) { MOZ_DIAGNOSTIC_ASSERT(aInputStream); MOZ_DIAGNOSTIC_ASSERT(aStreamHolder); } FetchStream::~FetchStream() { } void FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aError) { // Nothing to do. if (mState == eClosed) { return; } // Let's close the stream. if (aError == NS_BASE_STREAM_CLOSED) { CloseAndReleaseObjects(aCx, aStream); return; } // Let's use a generic error. RefPtr error = DOMException::Create(NS_ERROR_DOM_TYPE_ERR); JS::Rooted errorValue(aCx); if (ToJSValue(aCx, error, &errorValue)) { JS::ReadableStreamError(aCx, aStream, errorValue); } ReleaseObjects(); } NS_IMETHODIMP FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { MOZ_DIAGNOSTIC_ASSERT(aStream); // Already closed. We have nothing else to do here. if (mState == eClosed) { return NS_OK; } MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking); AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { // Without JSContext we are not able to close the stream or to propagate the // error. return NS_ERROR_FAILURE; } JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, mStreamHolder->ReadableStreamBody()); uint64_t size = 0; nsresult rv = mInputStream->Available(&size); if (NS_SUCCEEDED(rv) && size == 0) { // In theory this should not happen. If size is 0, the stream should be // considered closed. rv = NS_BASE_STREAM_CLOSED; } // No warning for stream closed. if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(cx, stream, rv); return NS_OK; } // This extra checking is completed. Let's wait for the next read request. if (mState == eChecking) { mState = eWaiting; return NS_OK; } mState = eWriting; JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); // The WriteInto callback changes mState to eChecking. MOZ_DIAGNOSTIC_ASSERT(mState == eChecking); return NS_OK; } /* static */ nsresult FetchStream::RetrieveInputStream(void* aUnderlyingReadableStreamSource, nsIInputStream** aInputStream) { MOZ_ASSERT(aUnderlyingReadableStreamSource); MOZ_ASSERT(aInputStream); RefPtr stream = static_cast(aUnderlyingReadableStreamSource); // if mOriginalInputStream is null, the reading already started. We don't want // to expose the internal inputStream. if (NS_WARN_IF(!stream->mOriginalInputStream)) { return NS_ERROR_DOM_INVALID_STATE_ERR; } nsCOMPtr inputStream = stream->mOriginalInputStream; inputStream.forget(aInputStream); return NS_OK; } void FetchStream::Close() { if (mState == eClosed) { return; } AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { ReleaseObjects(); return; } JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, mStreamHolder->ReadableStreamBody()); CloseAndReleaseObjects(cx, stream); } void FetchStream::CloseAndReleaseObjects(JSContext* aCx, JS::HandleObject aStream) { MOZ_DIAGNOSTIC_ASSERT(mState != eClosed); ReleaseObjects(); if (JS::ReadableStreamIsReadable(aStream)) { JS::ReadableStreamClose(aCx, aStream); } } void FetchStream::ReleaseObjects() { if (mState == eClosed) { return; } mState = eClosed; if (mWorkerHolder) { RefPtr r = new FetchStreamWorkerHolderShutdown( static_cast(mWorkerHolder.get())->GetWorkerPrivate(), Move(mWorkerHolder), Move(mGlobal), Move(mStreamHolder)); r->Dispatch(); } else { RefPtr self = this; RefPtr r = NS_NewRunnableFunction( "FetchStream::ReleaseObjects", [self] () { nsCOMPtr obs = mozilla::services::GetObserverService(); if (obs) { obs->RemoveObserver(self, DOM_WINDOW_DESTROYED_TOPIC); } self->mGlobal = nullptr; self->mStreamHolder->NullifyStream(); self->mStreamHolder = nullptr; }); mOwningEventTarget->Dispatch(r.forget()); } } // nsIObserver // ----------- NS_IMETHODIMP FetchStream::Observe(nsISupports* aSubject, const char* aTopic, const char16_t* aData) { AssertIsOnMainThread(); MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0); nsCOMPtr window = do_QueryInterface(mGlobal); if (SameCOMIdentity(aSubject, window)) { Close(); } return NS_OK; } } // dom namespace } // mozilla namespace