Bug 1619953 - P3 - Asynchronous BlobURLInputStream added r=baku

Reading from this stream type should be the preferred way to consume data from blob url. When created in the parent process, it requests blob data locally from BlobURLProtocolHandler. When created in a content process, it makes BlobURLDataRequest IPC call to the parent process. Should be wrapped in a nsBufferedInputStream, because ReadSegments() is currently not implemented for this stream type.

Differential Revision: https://phabricator.services.mozilla.com/D75292
This commit is contained in:
ssengupta 2020-08-05 17:05:56 +00:00
Родитель 562ceea795
Коммит f600911e45
3 изменённых файлов: 620 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,535 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "BlobURLInputStream.h"
#include "mozilla/dom/ContentChild.h"
#include "mozilla/dom/IPCBlobUtils.h"
#include "nsStreamUtils.h"
namespace mozilla {
namespace dom {
NS_IMPL_ADDREF(BlobURLInputStream);
NS_IMPL_RELEASE(BlobURLInputStream);
NS_INTERFACE_MAP_BEGIN(BlobURLInputStream)
NS_INTERFACE_MAP_ENTRY(nsIInputStream)
NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
NS_INTERFACE_MAP_ENTRY(nsIInputStreamLength)
NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStreamLength)
NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
NS_INTERFACE_MAP_END
/* static */
already_AddRefed<nsIInputStream> BlobURLInputStream::Create(
BlobURLChannel* const aChannel, BlobURL* const aBlobURL) {
MOZ_ASSERT(NS_IsMainThread());
if (NS_WARN_IF(!aChannel) || NS_WARN_IF(!aBlobURL)) {
return nullptr;
}
nsAutoCString spec;
nsresult rv = aBlobURL->GetSpec(spec);
if (NS_WARN_IF(NS_FAILED(rv))) {
return nullptr;
}
return MakeAndAddRef<BlobURLInputStream>(aChannel, spec);
}
// from nsIInputStream interface
NS_IMETHODIMP BlobURLInputStream::Close() {
return CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
NS_IMETHODIMP BlobURLInputStream::Available(uint64_t* aLength) {
MutexAutoLock lock(mStateMachineMutex);
if (mState == State::ERROR) {
MOZ_ASSERT(NS_FAILED(mError));
return mError;
}
if (mState == State::CLOSED) {
return NS_BASE_STREAM_CLOSED;
}
if (mState == State::READY) {
MOZ_ASSERT(mAsyncInputStream);
return mAsyncInputStream->Available(aLength);
}
return NS_BASE_STREAM_WOULD_BLOCK;
}
NS_IMETHODIMP BlobURLInputStream::Read(char* aBuffer, uint32_t aCount,
uint32_t* aReadCount) {
MutexAutoLock lock(mStateMachineMutex);
if (mState == State::ERROR) {
MOZ_ASSERT(NS_FAILED(mError));
return mError;
}
// Read() should not return NS_BASE_STREAM_CLOSED if stream is closed.
// A read count of 0 should indicate closed or consumed stream.
// See:
// https://searchfox.org/mozilla-central/rev/559b25eb41c1cbffcb90a34e008b8288312fcd25/xpcom/io/nsIInputStream.idl#104
if (mState == State::CLOSED) {
*aReadCount = 0;
return NS_OK;
}
if (mState == State::READY) {
MOZ_ASSERT(mAsyncInputStream);
nsresult rv = mAsyncInputStream->Read(aBuffer, aCount, aReadCount);
if (NS_SUCCEEDED(rv) && aReadCount && !*aReadCount) {
mState = State::CLOSED;
ReleaseUnderlyingStream(lock);
}
return rv;
}
return NS_BASE_STREAM_WOULD_BLOCK;
}
NS_IMETHODIMP BlobURLInputStream::ReadSegments(nsWriteSegmentFun aWriter,
void* aClosure, uint32_t aCount,
uint32_t* aResult) {
// This means the caller will have to wrap the stream in an
// nsBufferedInputStream in order to use ReadSegments
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP BlobURLInputStream::IsNonBlocking(bool* aNonBlocking) {
*aNonBlocking = true;
return NS_OK;
}
// from nsIAsyncInputStream interface
NS_IMETHODIMP BlobURLInputStream::CloseWithStatus(nsresult aStatus) {
MutexAutoLock lock(mStateMachineMutex);
if (mState == State::READY) {
MOZ_ASSERT(mAsyncInputStream);
mAsyncInputStream->CloseWithStatus(aStatus);
}
mState = State::CLOSED;
ReleaseUnderlyingStream(lock);
return NS_OK;
}
NS_IMETHODIMP BlobURLInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
uint32_t aFlags,
uint32_t aRequestedCount,
nsIEventTarget* aEventTarget) {
MutexAutoLock lock(mStateMachineMutex);
if (mState == State::ERROR) {
MOZ_ASSERT(NS_FAILED(mError));
return NS_ERROR_FAILURE;
}
// Pre-empting a valid callback with another is not allowed.
if (NS_WARN_IF(mAsyncWaitCallback && aCallback)) {
return NS_ERROR_FAILURE;
}
mAsyncWaitTarget = aEventTarget;
mAsyncWaitRequestedCount = aRequestedCount;
mAsyncWaitFlags = aFlags;
mAsyncWaitCallback = aCallback;
if (mState == State::INITIAL) {
mState = State::WAITING;
// RetrieveBlobData will execute NotifyWWaitTarget() when retrieve succeeds
// or fails
if (NS_IsMainThread()) {
RetrieveBlobData(lock);
return NS_OK;
}
nsCOMPtr<nsIRunnable> runnable = mozilla::NewRunnableMethod(
"BlobURLInputStream::CallRetrieveBlobData", this,
&BlobURLInputStream::CallRetrieveBlobData);
NS_DispatchToMainThread(runnable.forget(), NS_DISPATCH_NORMAL);
return NS_OK;
}
if (mState == State::WAITING) {
// RetrieveBlobData is already in progress and will execute
// NotifyWaitTargets when retrieve succeeds or fails
return NS_OK;
}
if (mState == State::READY) {
// Ask the blob's input stream if reading is possible or not
return mAsyncInputStream->AsyncWait(
mAsyncWaitCallback ? this : nullptr, mAsyncWaitFlags,
mAsyncWaitRequestedCount, mAsyncWaitTarget);
}
MOZ_ASSERT(mState == State::CLOSED);
NotifyWaitTargets(lock);
return NS_OK;
}
// from nsIInputStreamLength interface
NS_IMETHODIMP BlobURLInputStream::Length(int64_t* aLength) {
MutexAutoLock lock(mStateMachineMutex);
if (mState == State::CLOSED) {
return NS_BASE_STREAM_CLOSED;
}
if (mState == State::ERROR) {
MOZ_ASSERT(NS_FAILED(mError));
return NS_ERROR_FAILURE;
}
if (mState == State::READY) {
*aLength = mBlobSize;
return NS_OK;
}
return NS_BASE_STREAM_WOULD_BLOCK;
}
// from nsIAsyncInputStreamLength interface
NS_IMETHODIMP BlobURLInputStream::AsyncLengthWait(
nsIInputStreamLengthCallback* aCallback, nsIEventTarget* aEventTarget) {
MutexAutoLock lock(mStateMachineMutex);
if (mState == State::ERROR) {
MOZ_ASSERT(NS_FAILED(mError));
return mError;
}
// Pre-empting a valid callback with another is not allowed.
if (mAsyncLengthWaitCallback && aCallback) {
return NS_ERROR_FAILURE;
}
mAsyncLengthWaitTarget = aEventTarget;
mAsyncLengthWaitCallback = aCallback;
if (mState == State::INITIAL) {
mState = State::WAITING;
// RetrieveBlobData will execute NotifyWWaitTarget() when retrieve succeeds
// or fails
if (NS_IsMainThread()) {
RetrieveBlobData(lock);
return NS_OK;
}
nsCOMPtr<nsIRunnable> runnable = mozilla::NewRunnableMethod(
"BlobURLInputStream::CallRetrieveBlobData", this,
&BlobURLInputStream::CallRetrieveBlobData);
NS_DispatchToMainThread(runnable.forget(), NS_DISPATCH_NORMAL);
return NS_OK;
}
if (mState == State::WAITING) {
// RetrieveBlobData is already in progress and will execute
// NotifyWaitTargets when retrieve succeeds or fails
return NS_OK;
}
// Since here the state must be READY (in which case the size of the blob is
// already known) or CLOSED, callback can be called immediately
NotifyWaitTargets(lock);
return NS_OK;
}
// from nsIInputStreamCallback interface
NS_IMETHODIMP BlobURLInputStream::OnInputStreamReady(
nsIAsyncInputStream* aStream) {
nsCOMPtr<nsIInputStreamCallback> callback;
{
MutexAutoLock lock(mStateMachineMutex);
MOZ_ASSERT_IF(mAsyncInputStream, aStream == mAsyncInputStream);
// aborted in the meantime
if (!mAsyncWaitCallback) {
return NS_OK;
}
mAsyncWaitCallback.swap(callback);
mAsyncWaitTarget = nullptr;
}
MOZ_ASSERT(callback);
return callback->OnInputStreamReady(this);
}
// from nsIInputStreamLengthCallback interface
NS_IMETHODIMP BlobURLInputStream::OnInputStreamLengthReady(
nsIAsyncInputStreamLength* aStream, int64_t aLength) {
nsCOMPtr<nsIInputStreamLengthCallback> callback;
{
MutexAutoLock lock(mStateMachineMutex);
// aborted in the meantime
if (!mAsyncLengthWaitCallback) {
return NS_OK;
}
mAsyncLengthWaitCallback.swap(callback);
mAsyncLengthWaitCallback = nullptr;
}
return callback->OnInputStreamLengthReady(this, aLength);
}
// private:
BlobURLInputStream::~BlobURLInputStream() {
if (mChannel) {
NS_ReleaseOnMainThread("BlobURLInputStream::mChannel", mChannel.forget());
}
}
BlobURLInputStream::BlobURLInputStream(BlobURLChannel* const aChannel,
nsACString& aBlobURLSpec)
: mChannel(aChannel),
mBlobURLSpec(std::move(aBlobURLSpec)),
mStateMachineMutex("BlobURLInputStream::mStateMachineMutex"),
mState(State::INITIAL),
mError(NS_OK),
mBlobSize(-1),
mAsyncWaitFlags(),
mAsyncWaitRequestedCount() {}
void BlobURLInputStream::WaitOnUnderlyingStream(
const MutexAutoLock& aProofOfLock) {
if (mAsyncWaitCallback || mAsyncWaitTarget) {
// AsyncWait should be called on the underlying stream
mAsyncInputStream->AsyncWait(mAsyncWaitCallback ? this : nullptr,
mAsyncWaitFlags, mAsyncWaitRequestedCount,
mAsyncWaitTarget);
}
if (mAsyncLengthWaitCallback || mAsyncLengthWaitTarget) {
// AsyncLengthWait should be called on the underlying stream
nsCOMPtr<nsIAsyncInputStreamLength> asyncStreamLength =
do_QueryInterface(mAsyncInputStream);
if (asyncStreamLength) {
asyncStreamLength->AsyncLengthWait(
mAsyncLengthWaitCallback ? this : nullptr, mAsyncLengthWaitTarget);
}
}
}
void BlobURLInputStream::CallRetrieveBlobData() {
MutexAutoLock lock(mStateMachineMutex);
RetrieveBlobData(lock);
}
void BlobURLInputStream::RetrieveBlobData(const MutexAutoLock& aProofOfLock) {
MOZ_ASSERT(NS_IsMainThread(), "Only call on main thread");
nsCOMPtr<nsILoadInfo> loadInfo = mChannel->LoadInfo();
MOZ_ASSERT(mState == State::WAITING);
auto cleanupOnEarlyExit = MakeScopeExit([&] {
mState = State::ERROR;
mError = NS_ERROR_FAILURE;
NS_ReleaseOnMainThread("BlobURLInputStream::mChannel", mChannel.forget());
NotifyWaitTargets(aProofOfLock);
});
nsIPrincipal* triggeringPrincipal;
if (NS_WARN_IF(
NS_FAILED(loadInfo->GetTriggeringPrincipal(&triggeringPrincipal))) ||
!triggeringPrincipal) {
NS_WARNING("Failed to get owning channel's triggering principal");
return;
}
if (XRE_IsParentProcess()) {
nsIPrincipal* const dataEntryPrincipal =
BlobURLProtocolHandler::GetDataEntryPrincipal(mBlobURLSpec);
MOZ_ASSERT(dataEntryPrincipal);
if (NS_WARN_IF(!triggeringPrincipal->Subsumes(dataEntryPrincipal))) {
return;
}
RefPtr<BlobImpl> blobImpl;
nsresult rv =
NS_GetBlobForBlobURISpec(mBlobURLSpec, getter_AddRefs(blobImpl));
if (NS_WARN_IF(NS_FAILED(rv)) || (NS_WARN_IF(!blobImpl))) {
return;
}
if (NS_WARN_IF(
NS_FAILED(StoreBlobImplStream(blobImpl.forget(), aProofOfLock)))) {
return;
}
mState = State::READY;
// By design, execution can only reach here when a caller has called
// AsyncWait or AsyncLengthWait on this stream. The underlying stream is
// valid, but the caller should not be informed until that stream has data
// to read or it is closed.
WaitOnUnderlyingStream(aProofOfLock);
cleanupOnEarlyExit.release();
return;
}
ContentChild* contentChild{ContentChild::GetSingleton()};
MOZ_ASSERT(contentChild);
const RefPtr<BlobURLInputStream> self = this;
cleanupOnEarlyExit.release();
contentChild->SendBlobURLDataRequest(mBlobURLSpec, triggeringPrincipal)
->Then(
GetCurrentSerialEventTarget(), __func__,
[self](const BlobURLDataRequestResult& aResult) {
MutexAutoLock lock(self->mStateMachineMutex);
if (aResult.type() == BlobURLDataRequestResult::TIPCBlob) {
if (self->mState == State::WAITING) {
RefPtr<BlobImpl> blobImpl =
IPCBlobUtils::Deserialize(aResult.get_IPCBlob());
if (blobImpl && self->StoreBlobImplStream(blobImpl.forget(),
lock) == NS_OK) {
self->mState = State::READY;
// By design, execution can only reach here when a caller has
// called AsyncWait or AsyncLengthWait on this stream. The
// underlying stream is valid, but the caller should not be
// informed until that stream has data to read or it is
// closed.
self->WaitOnUnderlyingStream(lock);
return;
}
} else {
MOZ_ASSERT(self->mState == State::CLOSED);
// Callback can be called immediately
self->NotifyWaitTargets(lock);
return;
}
}
NS_WARNING("Blob data was not retrieved!");
self->mState = State::ERROR;
self->mError = aResult.type() == BlobURLDataRequestResult::Tnsresult
? aResult.get_nsresult()
: NS_ERROR_FAILURE;
NS_ReleaseOnMainThread("BlobURLInputStream::mChannel",
self->mChannel.forget());
self->NotifyWaitTargets(lock);
},
[self](mozilla::ipc::ResponseRejectReason aReason) {
MutexAutoLock lock(self->mStateMachineMutex);
NS_WARNING("IPC call to SendBlobURLDataRequest failed!");
self->mState = State::ERROR;
self->mError = NS_ERROR_FAILURE;
NS_ReleaseOnMainThread("BlobURLInputStream::mChannel",
self->mChannel.forget());
self->NotifyWaitTargets(lock);
});
}
nsresult BlobURLInputStream::StoreBlobImplStream(
already_AddRefed<BlobImpl> aBlobImpl, const MutexAutoLock& aProofOfLock) {
MOZ_ASSERT(NS_IsMainThread(), "Only call on main thread");
const RefPtr<BlobImpl> blobImppl = aBlobImpl;
nsAutoString contentType;
blobImppl->GetType(contentType);
mChannel->SetContentType(NS_ConvertUTF16toUTF8(contentType));
auto cleanupOnExit = MakeScopeExit([&] { mChannel = nullptr; });
if (blobImppl->IsFile()) {
nsAutoString filename;
blobImppl->GetName(filename);
if (!filename.IsEmpty()) {
mChannel->SetContentDispositionFilename(filename);
}
}
mozilla::ErrorResult errorResult;
mBlobSize = blobImppl->GetSize(errorResult);
if (NS_WARN_IF(errorResult.Failed())) {
return errorResult.StealNSResult();
}
mChannel->SetContentLength(mBlobSize);
nsCOMPtr<nsIInputStream> inputStream;
blobImppl->CreateInputStream(getter_AddRefs(inputStream), errorResult);
if (NS_WARN_IF(errorResult.Failed())) {
return errorResult.StealNSResult();
}
if (NS_WARN_IF(!inputStream)) {
return NS_ERROR_NOT_AVAILABLE;
}
nsresult rv = NS_MakeAsyncNonBlockingInputStream(
inputStream.forget(), getter_AddRefs(mAsyncInputStream));
if (NS_WARN_IF(NS_FAILED(rv))) {
return rv;
}
if (NS_WARN_IF(!mAsyncInputStream)) {
return NS_ERROR_NOT_AVAILABLE;
}
return NS_OK;
}
void BlobURLInputStream::NotifyWaitTargets(const MutexAutoLock& aProofOfLock) {
if (mAsyncWaitCallback) {
auto callback = mAsyncWaitTarget
? NS_NewInputStreamReadyEvent(
"BlobURLInputStream::OnInputStreamReady",
mAsyncWaitCallback, mAsyncWaitTarget)
: mAsyncWaitCallback;
mAsyncWaitCallback = nullptr;
mAsyncWaitTarget = nullptr;
callback->OnInputStreamReady(this);
}
if (mAsyncLengthWaitCallback) {
const RefPtr<BlobURLInputStream> self = this;
nsCOMPtr<nsIRunnable> runnable = NS_NewRunnableFunction(
"BlobURLInputStream::OnInputStreamLengthReady", [self] {
self->mAsyncLengthWaitCallback->OnInputStreamLengthReady(
self, self->mBlobSize);
});
mAsyncLengthWaitCallback = nullptr;
mAsyncLengthWaitTarget = nullptr;
if (mAsyncLengthWaitTarget) {
mAsyncLengthWaitTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
} else {
runnable->Run();
}
}
}
void BlobURLInputStream::ReleaseUnderlyingStream(
const MutexAutoLock& aProofOfLock) {
mAsyncInputStream = nullptr;
mBlobSize = -1;
}
} // namespace dom
} // namespace mozilla

Просмотреть файл

@ -0,0 +1,83 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef mozilla_dom_BlobURLInputStream_h
#define mozilla_dom_BlobURLInputStream_h
#include "mozilla/dom/BlobImpl.h"
#include "mozilla/Mutex.h"
#include "nsIAsyncInputStream.h"
#include "nsIInputStreamLength.h"
namespace mozilla {
namespace dom {
class BlobURL;
class BlobURLChannel;
class BlobURLInputStream final : public nsIAsyncInputStream,
public nsIInputStreamLength,
public nsIAsyncInputStreamLength,
public nsIInputStreamCallback,
public nsIInputStreamLengthCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIINPUTSTREAM
NS_DECL_NSIASYNCINPUTSTREAM
NS_DECL_NSIINPUTSTREAMLENGTH
NS_DECL_NSIASYNCINPUTSTREAMLENGTH
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIINPUTSTREAMLENGTHCALLBACK
static already_AddRefed<nsIInputStream> Create(BlobURLChannel* const aChannel,
BlobURL* const aBlobURL);
BlobURLInputStream(BlobURLChannel* const aChannel, nsACString& aBlobURLSpec);
private:
enum class State { INITIAL, READY, WAITING, CLOSED, ERROR };
~BlobURLInputStream();
void WaitOnUnderlyingStream(const MutexAutoLock& aProofOfLock);
// This method should only be used to call RetrieveBlobData in a different
// thread
void CallRetrieveBlobData();
void RetrieveBlobData(const MutexAutoLock& aProofOfLock);
nsresult StoreBlobImplStream(already_AddRefed<BlobImpl> aBlobImpl,
const MutexAutoLock& aProofOfLock);
void NotifyWaitTargets(const MutexAutoLock& aProofOfLock);
void ReleaseUnderlyingStream(const MutexAutoLock& aProofOfLock);
RefPtr<BlobURLChannel> mChannel;
const nsCString mBlobURLSpec;
// Non-recursive mutex introduced in order to guard access to mState, mError
// and mAsyncInputStream
Mutex mStateMachineMutex;
State mState;
// Stores the error code if stream is in error state
nsresult mError;
int64_t mBlobSize;
nsCOMPtr<nsIAsyncInputStream> mAsyncInputStream;
nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
nsCOMPtr<nsIEventTarget> mAsyncWaitTarget;
uint32_t mAsyncWaitFlags;
uint32_t mAsyncWaitRequestedCount;
nsCOMPtr<nsIInputStreamLengthCallback> mAsyncLengthWaitCallback;
nsCOMPtr<nsIEventTarget> mAsyncLengthWaitTarget;
};
} // namespace dom
} // namespace mozilla
#endif /* mozilla_dom_BlobURLInputStream_h */

Просмотреть файл

@ -9,6 +9,7 @@ with Files("**"):
EXPORTS.mozilla.dom += [
'BlobURL.h',
'BlobURLInputStream.h',
'BlobURLProtocolHandler.h',
'FontTableURIProtocolHandler.h',
]
@ -16,6 +17,7 @@ EXPORTS.mozilla.dom += [
UNIFIED_SOURCES += [
'BlobURL.cpp',
'BlobURLChannel.cpp',
'BlobURLInputStream.cpp',
'BlobURLProtocolHandler.cpp',
'FontTableURIProtocolHandler.cpp',
]