/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "FetchStream.h" #include "nsITransport.h" #include "nsIStreamTransportService.h" #include "nsProxyRelease.h" #include "WorkerPrivate.h" #include "Workers.h" #define FETCH_STREAM_FLAG 0 static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID); namespace mozilla { namespace dom { using namespace workers; namespace { class FetchStreamWorkerHolder final : public WorkerHolder { public: explicit FetchStreamWorkerHolder(FetchStream* aStream) : WorkerHolder(WorkerHolder::Behavior::AllowIdleShutdownStart) , mStream(aStream) , mWasNotified(false) {} bool Notify(Status aStatus) override { if (!mWasNotified) { mWasNotified = true; mStream->Close(); } return true; } WorkerPrivate* GetWorkerPrivate() const { return mWorkerPrivate; } private: RefPtr mStream; bool mWasNotified; }; class FetchStreamWorkerHolderShutdown final : public WorkerControlRunnable { public: FetchStreamWorkerHolderShutdown(WorkerPrivate* aWorkerPrivate, UniquePtr&& aHolder, nsCOMPtr&& aGlobal) : WorkerControlRunnable(aWorkerPrivate) , mHolder(Move(aHolder)) , mGlobal(Move(aGlobal)) {} bool WorkerRun(JSContext* aCx, WorkerPrivate* aWorkerPrivate) override { mHolder = nullptr; mGlobal = nullptr; return true; } private: UniquePtr mHolder; nsCOMPtr mGlobal; }; } // anonymous NS_IMPL_ISUPPORTS(FetchStream, nsIInputStreamCallback, nsIObserver, nsISupportsWeakReference) /* static */ JSObject* FetchStream::Create(JSContext* aCx, nsIGlobalObject* aGlobal, nsIInputStream* aInputStream, ErrorResult& aRv) { MOZ_DIAGNOSTIC_ASSERT(aCx); MOZ_DIAGNOSTIC_ASSERT(aInputStream); RefPtr stream = new FetchStream(aGlobal, aInputStream); if (NS_IsMainThread()) { nsCOMPtr os = mozilla::services::GetObserverService(); if (NS_WARN_IF(!os)) { aRv.Throw(NS_ERROR_FAILURE); return nullptr; } aRv = os->AddObserver(stream, DOM_WINDOW_DESTROYED_TOPIC, true); if (NS_WARN_IF(aRv.Failed())) { return nullptr; } } else { WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx); MOZ_ASSERT(workerPrivate); UniquePtr holder( new FetchStreamWorkerHolder(stream)); if (NS_WARN_IF(!holder->HoldWorker(workerPrivate, Closing))) { aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR); return nullptr; } // Note, this will create a ref-cycle between the holder and the stream. // The cycle is broken when the stream is closed or the worker begins // shutting down. stream->mWorkerHolder = Move(holder); } if (!JS::HasReadableStreamCallbacks(aCx)) { JS::SetReadableStreamCallbacks(aCx, &FetchStream::RequestDataCallback, &FetchStream::WriteIntoReadRequestCallback, &FetchStream::CancelCallback, &FetchStream::ClosedCallback, &FetchStream::ErroredCallback, &FetchStream::FinalizeCallback); } JS::Rooted body(aCx, JS::NewReadableExternalSourceStreamObject(aCx, stream, FETCH_STREAM_FLAG)); if (!body) { aRv.StealExceptionFromJSContext(aCx); return nullptr; } stream->mReadableStream = body; // JS engine will call the finalize callback. NS_ADDREF(stream.get()); return body; } /* static */ void FetchStream::RequestDataCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, size_t aDesiredSize) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); MOZ_DIAGNOSTIC_ASSERT(JS::ReadableStreamIsDisturbed(aStream)); RefPtr stream = static_cast(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWaiting || stream->mState == eChecking); if (stream->mState == eChecking) { // If we are looking for more data, there is nothing else we should do: // let's move this checking operation in a reading. MOZ_ASSERT(stream->mInputStream); stream->mState = eReading; return; } stream->mState = eReading; if (!stream->mInputStream) { // This is the first use of the stream. Let's convert the // mOriginalInputStream into an nsIAsyncInputStream. MOZ_ASSERT(stream->mOriginalInputStream); bool nonBlocking = false; nsresult rv = stream->mOriginalInputStream->IsNonBlocking(&nonBlocking); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } nsCOMPtr asyncStream = do_QueryInterface(stream->mOriginalInputStream); if (!nonBlocking || !asyncStream) { nsCOMPtr sts = do_GetService(kStreamTransportServiceCID, &rv); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } nsCOMPtr transport; rv = sts->CreateInputTransport(stream->mOriginalInputStream, /* aStartOffset */ 0, /* aReadLimit */ -1, /* aCloseWhenDone */ true, getter_AddRefs(transport)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } nsCOMPtr wrapper; rv = transport->OpenInputStream(/* aFlags */ 0, /* aSegmentSize */ 0, /* aSegmentCount */ 0, getter_AddRefs(wrapper)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } asyncStream = do_QueryInterface(wrapper); } stream->mInputStream = asyncStream; stream->mOriginalInputStream = nullptr; } MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(!stream->mOriginalInputStream); nsresult rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mGlobal->EventTargetFor(TaskCategory::Other)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } // All good. } /* static */ void FetchStream::WriteIntoReadRequestCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, void* aBuffer, size_t aLength, size_t* aByteWritten) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); MOZ_DIAGNOSTIC_ASSERT(aBuffer); MOZ_DIAGNOSTIC_ASSERT(aByteWritten); RefPtr stream = static_cast(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(stream->mInputStream); MOZ_DIAGNOSTIC_ASSERT(stream->mState == eWriting); stream->mState = eChecking; uint32_t written; nsresult rv = stream->mInputStream->Read(static_cast(aBuffer), aLength, &written); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } *aByteWritten = written; if (written == 0) { stream->mState = eClosed; JS::ReadableStreamClose(aCx, aStream); return; } rv = stream->mInputStream->AsyncWait(stream, 0, 0, stream->mGlobal->EventTargetFor(TaskCategory::Other)); if (NS_WARN_IF(NS_FAILED(rv))) { stream->ErrorPropagation(aCx, aStream, rv); return; } // All good. } /* static */ JS::Value FetchStream::CancelCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, JS::HandleValue aReason) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); RefPtr stream = static_cast(aUnderlyingSource); if (stream->mInputStream) { stream->mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED); } stream->mState = eClosed; return JS::UndefinedValue(); } /* static */ void FetchStream::ClosedCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); } /* static */ void FetchStream::ErroredCallback(JSContext* aCx, JS::HandleObject aStream, void* aUnderlyingSource, uint8_t aFlags, JS::HandleValue aReason) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); } void FetchStream::FinalizeCallback(void* aUnderlyingSource, uint8_t aFlags) { MOZ_DIAGNOSTIC_ASSERT(aUnderlyingSource); MOZ_DIAGNOSTIC_ASSERT(aFlags == FETCH_STREAM_FLAG); // This can be called in any thread. RefPtr stream = dont_AddRef(static_cast(aUnderlyingSource)); if (stream->mState == eClosed) { return; } stream->CloseAndReleaseObjects(); } FetchStream::FetchStream(nsIGlobalObject* aGlobal, nsIInputStream* aInputStream) : mState(eWaiting) , mGlobal(aGlobal) , mOriginalInputStream(aInputStream) , mOwningEventTarget(mGlobal->EventTargetFor(TaskCategory::Other)) , mReadableStream(nullptr) { MOZ_DIAGNOSTIC_ASSERT(aInputStream); } FetchStream::~FetchStream() { } void FetchStream::ErrorPropagation(JSContext* aCx, JS::HandleObject aStream, nsresult aError) { // Nothing to do. if (mState == eClosed) { return; } // We cannot continue with any other operation. mState = eClosed; // Let's close the stream. if (aError == NS_BASE_STREAM_CLOSED) { JS::ReadableStreamClose(aCx, aStream); return; } nsCOMPtr window = do_QueryInterface(mGlobal); // Let's use a generic error. RefPtr error = new DOMError(window, NS_ERROR_DOM_TYPE_ERR); JS::Rooted errorValue(aCx); if (ToJSValue(aCx, error, &errorValue)) { JS::ReadableStreamError(aCx, aStream, errorValue); } } NS_IMETHODIMP FetchStream::OnInputStreamReady(nsIAsyncInputStream* aStream) { MOZ_DIAGNOSTIC_ASSERT(aStream); // Already closed. We have nothing else to do here. if (mState == eClosed) { return NS_OK; } MOZ_DIAGNOSTIC_ASSERT(mInputStream); MOZ_DIAGNOSTIC_ASSERT(mState == eReading || mState == eChecking); AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { // Without JSContext we are not able to close the stream or to propagate the // error. return NS_ERROR_FAILURE; } JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, mReadableStream); uint64_t size = 0; nsresult rv = mInputStream->Available(&size); if (NS_SUCCEEDED(rv) && size == 0) { // In theory this should not happen. If size is 0, the stream should be // considered closed. rv = NS_BASE_STREAM_CLOSED; } // No warning for stream closed. if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) { ErrorPropagation(cx, stream, rv); return NS_OK; } // This extra checking is completed. Let's wait for the next read request. if (mState == eChecking) { mState = eWaiting; return NS_OK; } mState = eWriting; JS::ReadableStreamUpdateDataAvailableFromSource(cx, stream, size); // The WriteInto callback changes mState to eChecking. MOZ_DIAGNOSTIC_ASSERT(mState == eChecking); return NS_OK; } void FetchStream::Close() { if (mState == eClosed) { return; } AutoJSAPI jsapi; if (NS_WARN_IF(!jsapi.Init(mGlobal))) { return; } JSContext* cx = jsapi.cx(); JS::Rooted stream(cx, mReadableStream); JS::ReadableStreamClose(cx, stream); CloseAndReleaseObjects(); } void FetchStream::CloseAndReleaseObjects() { MOZ_DIAGNOSTIC_ASSERT(mState != eClosed); mState = eClosed; if (mWorkerHolder) { RefPtr r = new FetchStreamWorkerHolderShutdown( static_cast(mWorkerHolder.get())->GetWorkerPrivate(), Move(mWorkerHolder), Move(mGlobal)); r->Dispatch(); } else { RefPtr self = this; RefPtr r = NS_NewRunnableFunction( "FetchStream::Finalize", [self] () { nsCOMPtr os = mozilla::services::GetObserverService(); if (os) { os->RemoveObserver(self, DOM_WINDOW_DESTROYED_TOPIC); } self->mGlobal = nullptr; }); mGlobal->Dispatch("FetchStream::FinalizeCallback", TaskCategory::Other, r.forget()); } } // nsIObserver // ----------- NS_IMETHODIMP FetchStream::Observe(nsISupports* aSubject, const char* aTopic, const char16_t* aData) { AssertIsOnMainThread(); MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0); nsCOMPtr window = do_QueryInterface(mGlobal); if (SameCOMIdentity(aSubject, window)) { Close(); } return NS_OK; } } // dom namespace } // mozilla namespace