/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "BodyStream.h" #include "mozilla/dom/DOMException.h" #include "mozilla/dom/WorkerCommon.h" #include "mozilla/dom/WorkerPrivate.h" #include "mozilla/dom/WorkerRunnable.h" #include "nsProxyRelease.h" #include "nsStreamUtils.h" static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); namespace mozilla { namespace dom { // BodyStreamHolder // --------------------------------------------------------------------------- NS_IMPL_CYCLE_COLLECTION_CLASS(BodyStreamHolder) NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(BodyStreamHolder) NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(BodyStreamHolder) NS_IMPL_CYCLE_COLLECTION_UNLINK_END NS_IMPL_CYCLE_COLLECTING_ADDREF(BodyStreamHolder) NS_IMPL_CYCLE_COLLECTING_RELEASE(BodyStreamHolder) NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(BodyStreamHolder) NS_INTERFACE_MAP_ENTRY(nsISupports) NS_INTERFACE_MAP_END // BodyStream // --------------------------------------------------------------------------- class BodyStream::WorkerShutdown final : public WorkerControlRunnable { public: WorkerShutdown(WorkerPrivate* aWorkerPrivate, RefPtr aStream) : WorkerControlRunnable(aWorkerPrivate, WorkerThreadUnchangedBusyCount), mStream(aStream) {} bool WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override { mStream->ReleaseObjects(); return true; } // This runnable starts from a JS Thread. We need to disable a couple of // assertions overring the following methods. bool PreDispatch(WorkerPrivate* aWorkerPrivate) override { return true; } void PostDispatch(WorkerPrivate* aWorkerPrivate, bool aDispatchResult) override {} private: RefPtr mStream; }; NS_IMPL_ISUPPORTS(BodyStream, nsIInputStreamCallback, nsIObserver, nsISupportsWeakReference) /* static */ void BodyStream::Create(JSContext* aCx, BodyStreamHolder* aStreamHolder, nsIGlobalObject* aGlobal, nsIInputStream* aInputStream, ErrorResult& aRv) { MOZ_DIAGNOSTIC_ASSERT(aCx); MOZ_DIAGNOSTIC_ASSERT(aStreamHolder); MOZ_DIAGNOSTIC_ASSERT(aInputStream); RefPtr stream = new BodyStream(aGlobal, aStreamHolder, aInputStream); if (NS_IsMainThread()) { nsCOMPtr os = mozilla::services::GetObserverService(); if (NS_WARN_IF(!os)) { aRv.Throw(NS_ERROR_FAILURE); return; } aRv = os->AddObserver(stream, DOM_WINDOW_DESTROYED_TOPIC, true); if (NS_WARN_IF(aRv.Failed())) { return; } } else { WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx); MOZ_ASSERT(workerPrivate); RefPtr workerRef = WeakWorkerRef::Create(workerPrivate, [stream]() { stream->Close(); }); if (NS_WARN_IF(!workerRef)) { aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR); return; } // Note, this will create a ref-cycle between the holder and the stream. // The cycle is broken when the stream is closed or the worker begins // shutting down. stream->mWorkerRef = workerRef.forget(); } aRv.MightThrowJSException(); JS::Rooted body(aCx, JS::NewReadableExternalSourceStreamObject( aCx, stream, aStreamHolder)); if (!body) { aRv.StealExceptionFromJSContext(aCx); return; } // This will be released in BodyStream::FinalizeCallback(). We are // guaranteed the jsapi will call FinalizeCallback when ReadableStream // js object is finalized. NS_ADDREF(stream.get()); aStreamHolder->SetReadableStreamBody(body); } void BodyStream::requestData(JSContext* aCx, JS::HandleObject aStream, size_t aDesiredSize) { #if MOZ_DIAGNOSTIC_ASSERT_ENABLED bool disturbed; if (!JS::ReadableStreamIsDisturbed(aCx, aStream, &disturbed)) { JS_ClearPendingException(aCx); } else { MOZ_DIAGNOSTIC_ASSERT(disturbed); } #endif AssertIsOnOwningThread(); MutexAutoLock lock(mMutex); MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eWaiting || mState == eChecking || mState == eReading); if (mState == eReading) { // We are already reading data. return; } if (mState == eChecking) { // If we are looking for more data, there is nothing else we should do: // let's move this checking operation in a reading. MOZ_ASSERT(mInputStream); mState = eReading; return; } if (mState == eInitializing) { // The stream has been used for the first time. mStreamHolder->MarkAsRead(); } mState = eReading; if (!mInputStream) { // This is the first use of the stream. Let's convert the // mOriginalInputStream into an nsIAsyncInputStream. MOZ_ASSERT(mOriginalInputStream); nsCOMPtr asyncStream; nsresult rv = NS_MakeAsyncNonBlockingInputStream( mOriginalInputStream.forget(), getter_AddRefs(asyncStream)); if (NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(aCx, lock, aStream, rv); return; } mInputStream = asyncStream; mOriginalInputStream = nullptr; } MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(!mOriginalInputStream); nsresult rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(aCx, lock, aStream, rv); return; } // All good. } void BodyStream::writeIntoReadRequestBuffer(JSContext* aCx, JS::HandleObject aStream, void* aBuffer, size_t aLength, size_t* aByteWritten) { MOZ_DIAGNOSTIC_ASSERT(aBuffer); MOZ_DIAGNOSTIC_ASSERT(aByteWritten); AssertIsOnOwningThread(); MutexAutoLock lock(mMutex); MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(mState == eWriting); mState = eChecking; uint32_t written; nsresult rv = mInputStream->Read(static_cast(aBuffer), aLength, &written); if (NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(aCx, lock, aStream, rv); return; } *aByteWritten = written; if (written == 0) { CloseAndReleaseObjects(aCx, lock, aStream); return; } rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget); if (NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(aCx, lock, aStream, rv); return; } // All good. } JS::Value BodyStream::cancel(JSContext* aCx, JS::HandleObject aStream, JS::HandleValue aReason) { AssertIsOnOwningThread(); if (mState == eInitializing) { // The stream has been used for the first time. mStreamHolder->MarkAsRead(); } if (mInputStream) { mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } // It could be that we don't have mInputStream yet, but we still have the // original stream. We need to close that too. if (mOriginalInputStream) { MOZ_ASSERT(!mInputStream); mOriginalInputStream->Close(); } ReleaseObjects(); return JS::UndefinedValue(); } void BodyStream::onClosed(JSContext* aCx, JS::HandleObject aStream) {} void BodyStream::onErrored(JSContext* aCx, JS::HandleObject aStream, JS::HandleValue aReason) { AssertIsOnOwningThread(); if (mState == eInitializing) { // The stream has been used for the first time. mStreamHolder->MarkAsRead(); } if (mInputStream) { mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } ReleaseObjects(); } void BodyStream::finalize() { // This can be called in any thread. // This takes ownership of the ref created in BodyStream::Create(). RefPtr stream = dont_AddRef(this); stream->ReleaseObjects(); } BodyStream::BodyStream(nsIGlobalObject* aGlobal, BodyStreamHolder* aStreamHolder, nsIInputStream* aInputStream) : mMutex("BodyStream::mMutex"), mState(eInitializing), mGlobal(aGlobal), mStreamHolder(aStreamHolder), mOwningEventTarget(aGlobal->EventTargetFor(TaskCategory::Other)), mOriginalInputStream(aInputStream) { MOZ_DIAGNOSTIC_ASSERT(aInputStream); MOZ_DIAGNOSTIC_ASSERT(aStreamHolder); } BodyStream::~BodyStream() {} void BodyStream::ErrorPropagation(JSContext* aCx, const MutexAutoLock& aProofOfLock, JS::HandleObject aStream, nsresult aError) { AssertIsOnOwningThread(); // Nothing to do. if (mState == eClosed) { return; } // Let's close the stream. if (aError == NS_BASE_STREAM_CLOSED) { CloseAndReleaseObjects(aCx, aProofOfLock, aStream); return; } // Let's use a generic error. RefPtr error = DOMException::Create(NS_ERROR_DOM_TYPE_ERR); JS::Rooted errorValue(aCx); if (ToJSValue(aCx, error, &errorValue)) { MutexAutoUnlock unlock(mMutex); JS::ReadableStreamError(aCx, aStream, errorValue); } ReleaseObjects(aProofOfLock); } NS_IMETHODIMP BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { AssertIsOnOwningThread(); MOZ_DIAGNOSTIC_ASSERT(aStream); MutexAutoLock lock(mMutex); // Already closed. We have nothing else to do here. if (mState == eClosed) { return NS_OK; } MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking); AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { // Without JSContext we are not able to close the stream or to propagate the // error. return NS_ERROR_FAILURE; } JSObject* streamObj = mStreamHolder->GetReadableStreamBody(); if (!streamObj) { return NS_ERROR_FAILURE; } JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, streamObj); uint64_t size = 0; nsresult rv = mInputStream->Available(&size); if (NS_SUCCEEDED(rv) && size == 0) { // In theory this should not happen. If size is 0, the stream should be // considered closed. rv = NS_BASE_STREAM_CLOSED; } // No warning for stream closed. if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(cx, lock, stream, rv); return NS_OK; } // This extra checking is completed. Let's wait for the next read request. if (mState == eChecking) { mState = eWaiting; return NS_OK; } mState = eWriting; { MutexAutoUnlock unlock(mMutex); DebugOnly ok = JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); // The WriteInto callback changes mState to eChecking. MOZ_ASSERT_IF(ok, mState == eChecking); } return NS_OK; } /* static */ nsresult BodyStream::RetrieveInputStream( JS::ReadableStreamUnderlyingSource* aUnderlyingReadableStreamSource, nsIInputStream** aInputStream) { MOZ_ASSERT(aUnderlyingReadableStreamSource); MOZ_ASSERT(aInputStream); RefPtr stream = static_cast(aUnderlyingReadableStreamSource); stream->AssertIsOnOwningThread(); // if mOriginalInputStream is null, the reading already started. We don't want // to expose the internal inputStream. if (NS_WARN_IF(!stream->mOriginalInputStream)) { return NS_ERROR_DOM_INVALID_STATE_ERR; } nsCOMPtr inputStream = stream->mOriginalInputStream; inputStream.forget(aInputStream); return NS_OK; } void BodyStream::Close() { AssertIsOnOwningThread(); MutexAutoLock lock(mMutex); if (mState == eClosed) { return; } AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { ReleaseObjects(lock); return; } JSObject* streamObj = mStreamHolder->GetReadableStreamBody(); if (streamObj) { JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, streamObj); CloseAndReleaseObjects(cx, lock, stream); } else { ReleaseObjects(lock); } } void BodyStream::CloseAndReleaseObjects(JSContext* aCx, const MutexAutoLock& aProofOfLock, JS::HandleObject aStream) { AssertIsOnOwningThread(); MOZ_DIAGNOSTIC_ASSERT(mState != eClosed); ReleaseObjects(aProofOfLock); MutexAutoUnlock unlock(mMutex); bool readable; if (!JS::ReadableStreamIsReadable(aCx, aStream, &readable)) { return; } if (readable) { JS::ReadableStreamClose(aCx, aStream); } } void BodyStream::ReleaseObjects() { MutexAutoLock lock(mMutex); ReleaseObjects(lock); } void BodyStream::ReleaseObjects(const MutexAutoLock& aProofOfLock) { // This method can be called on 2 possible threads: the owning one and a JS // thread used to release resources. If we are on the JS thread, we need to // dispatch a runnable to go back to the owning thread in order to release // resources correctly. if (mState == eClosed) { // Already gone. Nothing to do. return; } if (!NS_IsMainThread() && !IsCurrentThreadRunningWorker()) { // Let's dispatch a WorkerControlRunnable if the owning thread is a worker. if (mWorkerRef) { RefPtr r = new WorkerShutdown(mWorkerRef->GetUnsafePrivate(), this); Unused << NS_WARN_IF(!r->Dispatch()); return; } // A normal runnable of the owning thread is the main-thread. RefPtr self = this; RefPtr r = NS_NewRunnableFunction( "BodyStream::ReleaseObjects", [self]() { self->ReleaseObjects(); }); mOwningEventTarget->Dispatch(r.forget()); return; } AssertIsOnOwningThread(); mState = eClosed; if (NS_IsMainThread()) { nsCOMPtr obs = mozilla::services::GetObserverService(); if (obs) { obs->RemoveObserver(this, DOM_WINDOW_DESTROYED_TOPIC); } } JSObject* streamObj = mStreamHolder->GetReadableStreamBody(); if (streamObj) { // Let's inform the JSEngine that we are going to be released. JS::ReadableStreamReleaseCCObject(streamObj); } mWorkerRef = nullptr; mGlobal = nullptr; mStreamHolder->NullifyStream(); mStreamHolder = nullptr; } #ifdef DEBUG void BodyStream::AssertIsOnOwningThread() { NS_ASSERT_OWNINGTHREAD(BodyStream); } #endif // nsIObserver // ----------- NS_IMETHODIMP BodyStream::Observe(nsISupports* aSubject, const char* aTopic, const char16_t* aData) { AssertIsOnMainThread(); AssertIsOnOwningThread(); MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0); nsCOMPtr window = do_QueryInterface(mGlobal); if (SameCOMIdentity(aSubject, window)) { Close(); } return NS_OK; } } // namespace dom } // namespace mozilla