Bug 1397128 P5 Add ReadStream::Inner::EnsureStream() to synchronously open stream when Availabe/Read/ReadSegments is called. r=tt

This commit is contained in:
Ben Kelly 2017-09-15 09:11:23 -07:00
Родитель cd504ea7b0
Коммит 1cc7cde5c0
1 изменённых файлов: 123 добавлений и 6 удалений

129
dom/cache/ReadStream.cpp поставляемый
Просмотреть файл

@ -13,6 +13,7 @@
#include "mozilla/ipc/IPCStreamUtils.h"
#include "mozilla/SnappyUncompressInputStream.h"
#include "nsIAsyncInputStream.h"
#include "nsStringStream.h"
#include "nsTArray.h"
namespace mozilla {
@ -92,6 +93,18 @@ private:
void
ForgetOnOwningThread();
nsIInputStream*
EnsureStream();
void
AsyncOpenStreamOnOwningThread();
void
MaybeAbortAsyncOpenStream();
void
OpenStreamFailed();
// Weak ref to the stream control actor. The actor will always call either
// CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
// weak ref is cleared in the resulting NoteClosedOnOwningThread() or
@ -109,13 +122,14 @@ private:
};
Atomic<State> mState;
Atomic<bool> mHasEverBeenRead;
bool mAsyncOpenStarted;
// The wrapped stream objects may not be threadsafe. We need to be able
// to close a stream on our owning thread while an IO thread is simultaneously
// reading the same stream. Therefore, protect all access to these stream
// objects with a mutex.
Mutex mMutex;
CondVar mCondVar;
nsCOMPtr<nsIInputStream> mStream;
nsCOMPtr<nsIInputStream> mSnappyStream;
@ -202,7 +216,9 @@ ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
, mOwningEventTarget(GetCurrentThreadSerialEventTarget())
, mState(Open)
, mHasEverBeenRead(false)
, mAsyncOpenStarted(false)
, mMutex("dom::cache::ReadStream")
, mCondVar(mMutex, "dom::cache::ReadStream")
, mStream(aStream)
, mSnappyStream(new SnappyUncompressInputStream(aStream))
{
@ -288,7 +304,9 @@ ReadStream::Inner::Close()
nsresult rv = NS_OK;
{
MutexAutoLock lock(mMutex);
rv = mSnappyStream->Close();
if (mSnappyStream) {
rv = mSnappyStream->Close();
}
}
NoteClosed();
return rv;
@ -301,7 +319,7 @@ ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
nsresult rv = NS_OK;
{
MutexAutoLock lock(mMutex);
rv = mSnappyStream->Available(aNumAvailableOut);
rv = EnsureStream()->Available(aNumAvailableOut);
}
if (NS_FAILED(rv)) {
@ -320,7 +338,7 @@ ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
nsresult rv = NS_OK;
{
MutexAutoLock lock(mMutex);
rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut);
}
if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
@ -348,7 +366,7 @@ ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
nsresult rv = NS_OK;
{
MutexAutoLock lock(mMutex);
rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
}
if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
@ -372,7 +390,11 @@ ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
{
// stream ops can happen on any thread
MutexAutoLock lock(mMutex);
return mSnappyStream->IsNonBlocking(aNonBlockingOut);
if (mSnappyStream) {
return mSnappyStream->IsNonBlocking(aNonBlockingOut);
}
*aNonBlockingOut = false;
return NS_OK;
}
ReadStream::Inner::~Inner()
@ -428,6 +450,8 @@ ReadStream::Inner::NoteClosedOnOwningThread()
return;
}
MaybeAbortAsyncOpenStream();
MOZ_DIAGNOSTIC_ASSERT(mControl);
mControl->NoteClosed(this, mId);
mControl = nullptr;
@ -443,11 +467,104 @@ ReadStream::Inner::ForgetOnOwningThread()
return;
}
MaybeAbortAsyncOpenStream();
MOZ_DIAGNOSTIC_ASSERT(mControl);
mControl->ForgetReadStream(this);
mControl = nullptr;
}
nsIInputStream*
ReadStream::Inner::EnsureStream()
{
mMutex.AssertCurrentThreadOwns();
// We need to block the current thread while we open the stream. We
// cannot do this safely from the main owning thread since it would
// trigger deadlock. This should be ok, though, since a blocking
// stream like this should never be read on the owning thread anyway.
if (mOwningEventTarget->IsOnCurrentThread()) {
MOZ_CRASH("Blocking read on the js/ipc owning thread!");
}
if (mSnappyStream) {
return mSnappyStream;
}
nsCOMPtr<nsIRunnable> r =
NewCancelableRunnableMethod("ReadStream::Inner::AsyncOpenStreamOnOwningThread",
this,
&ReadStream::Inner::AsyncOpenStreamOnOwningThread);
nsresult rv = mOwningEventTarget->Dispatch(r.forget(),
nsIThread::DISPATCH_NORMAL);
if (NS_WARN_IF(NS_FAILED(rv))) {
OpenStreamFailed();
return mSnappyStream;
}
mCondVar.Wait();
MOZ_DIAGNOSTIC_ASSERT(mSnappyStream);
return mSnappyStream;
}
void
ReadStream::Inner::AsyncOpenStreamOnOwningThread()
{
MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
if (!mControl || mState == Closed) {
MutexAutoLock lock(mMutex);
OpenStreamFailed();
mCondVar.NotifyAll();
return;
}
if (mAsyncOpenStarted) {
return;
}
mAsyncOpenStarted = true;
RefPtr<ReadStream::Inner> self = this;
mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) {
MutexAutoLock lock(self->mMutex);
self->mAsyncOpenStarted = false;
if (!self->mStream) {
if (!aStream) {
self->OpenStreamFailed();
} else {
self->mStream = Move(aStream);
self->mSnappyStream = new SnappyUncompressInputStream(self->mStream);
}
}
self->mCondVar.NotifyAll();
});
}
void
ReadStream::Inner::MaybeAbortAsyncOpenStream()
{
if (!mAsyncOpenStarted) {
return;
}
MutexAutoLock lock(mMutex);
OpenStreamFailed();
mCondVar.NotifyAll();
}
void
ReadStream::Inner::OpenStreamFailed()
{
MOZ_DIAGNOSTIC_ASSERT(!mStream);
MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream);
mMutex.AssertCurrentThreadOwns();
Unused << NS_NewCStringInputStream(getter_AddRefs(mStream), EmptyCString());
mSnappyStream = mStream;
mStream->Close();
NoteClosed();
}
// ----------------------------------------------------------------------------
NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);