зеркало из https://github.com/mozilla/gecko-dev.git
1448 строки
40 KiB
C++
1448 строки
40 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
/**
|
|
* The multiplex stream concatenates a list of input streams into a single
|
|
* stream.
|
|
*/
|
|
|
|
#include "mozilla/Attributes.h"
|
|
#include "mozilla/CheckedInt.h"
|
|
#include "mozilla/MathAlgorithms.h"
|
|
#include "mozilla/Mutex.h"
|
|
|
|
#include "base/basictypes.h"
|
|
|
|
#include "nsMultiplexInputStream.h"
|
|
#include "nsIBufferedStreams.h"
|
|
#include "nsICloneableInputStream.h"
|
|
#include "nsIMultiplexInputStream.h"
|
|
#include "nsISeekableStream.h"
|
|
#include "nsCOMPtr.h"
|
|
#include "nsCOMArray.h"
|
|
#include "nsIClassInfoImpl.h"
|
|
#include "nsIIPCSerializableInputStream.h"
|
|
#include "mozilla/ipc/InputStreamUtils.h"
|
|
#include "nsIAsyncInputStream.h"
|
|
#include "nsIInputStreamLength.h"
|
|
#include "nsNetUtil.h"
|
|
#include "nsStreamUtils.h"
|
|
|
|
using namespace mozilla;
|
|
using namespace mozilla::ipc;
|
|
|
|
using mozilla::DeprecatedAbs;
|
|
using mozilla::Maybe;
|
|
using mozilla::Nothing;
|
|
using mozilla::Some;
|
|
|
|
class nsMultiplexInputStream final : public nsIMultiplexInputStream,
|
|
public nsISeekableStream,
|
|
public nsIIPCSerializableInputStream,
|
|
public nsICloneableInputStream,
|
|
public nsIAsyncInputStream,
|
|
public nsIInputStreamCallback,
|
|
public nsIInputStreamLength,
|
|
public nsIAsyncInputStreamLength {
|
|
public:
|
|
nsMultiplexInputStream();
|
|
|
|
NS_DECL_THREADSAFE_ISUPPORTS
|
|
NS_DECL_NSIINPUTSTREAM
|
|
NS_DECL_NSIMULTIPLEXINPUTSTREAM
|
|
NS_DECL_NSISEEKABLESTREAM
|
|
NS_DECL_NSITELLABLESTREAM
|
|
NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
|
|
NS_DECL_NSICLONEABLEINPUTSTREAM
|
|
NS_DECL_NSIASYNCINPUTSTREAM
|
|
NS_DECL_NSIINPUTSTREAMCALLBACK
|
|
NS_DECL_NSIINPUTSTREAMLENGTH
|
|
NS_DECL_NSIASYNCINPUTSTREAMLENGTH
|
|
|
|
// This is used for nsIAsyncInputStream::AsyncWait
|
|
void AsyncWaitCompleted();
|
|
|
|
// This is used for nsIAsyncInputStreamLength::AsyncLengthWait
|
|
void AsyncWaitCompleted(int64_t aLength, const MutexAutoLock& aProofOfLock);
|
|
|
|
struct StreamData {
|
|
nsresult Initialize(nsIInputStream* aOriginalStream) {
|
|
mCurrentPos = 0;
|
|
|
|
mOriginalStream = aOriginalStream;
|
|
|
|
mBufferedStream = aOriginalStream;
|
|
if (!NS_InputStreamIsBuffered(mBufferedStream)) {
|
|
nsCOMPtr<nsIInputStream> bufferedStream;
|
|
nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(bufferedStream),
|
|
mBufferedStream.forget(), 4096);
|
|
NS_ENSURE_SUCCESS(rv, rv);
|
|
mBufferedStream = bufferedStream;
|
|
}
|
|
|
|
mAsyncStream = do_QueryInterface(mBufferedStream);
|
|
mSeekableStream = do_QueryInterface(mBufferedStream);
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
nsCOMPtr<nsIInputStream> mOriginalStream;
|
|
|
|
// Equal to mOriginalStream or a wrap around the original stream to make it
|
|
// buffered.
|
|
nsCOMPtr<nsIInputStream> mBufferedStream;
|
|
|
|
// This can be null.
|
|
nsCOMPtr<nsIAsyncInputStream> mAsyncStream;
|
|
// This can be null.
|
|
nsCOMPtr<nsISeekableStream> mSeekableStream;
|
|
|
|
uint64_t mCurrentPos;
|
|
};
|
|
|
|
Mutex& GetLock() { return mLock; }
|
|
|
|
private:
|
|
~nsMultiplexInputStream() = default;
|
|
|
|
nsresult AsyncWaitInternal();
|
|
|
|
// This method updates mSeekableStreams, mTellableStreams,
|
|
// mIPCSerializableStreams and mCloneableStreams values.
|
|
void UpdateQIMap(StreamData& aStream);
|
|
|
|
struct MOZ_STACK_CLASS ReadSegmentsState {
|
|
nsCOMPtr<nsIInputStream> mThisStream;
|
|
uint32_t mOffset;
|
|
nsWriteSegmentFun mWriter;
|
|
void* mClosure;
|
|
bool mDone;
|
|
};
|
|
|
|
template <typename M>
|
|
void SerializeInternal(InputStreamParams& aParams,
|
|
FileDescriptorArray& aFileDescriptors,
|
|
bool aDelayedStart, uint32_t aMaxSize,
|
|
uint32_t* aSizeUsed, M* aManager);
|
|
|
|
static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
|
|
const char* aFromRawSegment, uint32_t aToOffset,
|
|
uint32_t aCount, uint32_t* aWriteCount);
|
|
|
|
bool IsSeekable() const;
|
|
bool IsIPCSerializable() const;
|
|
bool IsCloneable() const;
|
|
bool IsAsyncInputStream() const;
|
|
bool IsInputStreamLength() const;
|
|
bool IsAsyncInputStreamLength() const;
|
|
|
|
Mutex mLock; // Protects access to all data members.
|
|
|
|
nsTArray<StreamData> mStreams;
|
|
|
|
uint32_t mCurrentStream;
|
|
bool mStartedReadingCurrent;
|
|
nsresult mStatus;
|
|
nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
|
|
uint32_t mAsyncWaitFlags;
|
|
uint32_t mAsyncWaitRequestedCount;
|
|
nsCOMPtr<nsIEventTarget> mAsyncWaitEventTarget;
|
|
nsCOMPtr<nsIInputStreamLengthCallback> mAsyncWaitLengthCallback;
|
|
|
|
class AsyncWaitLengthHelper;
|
|
RefPtr<AsyncWaitLengthHelper> mAsyncWaitLengthHelper;
|
|
|
|
uint32_t mSeekableStreams;
|
|
uint32_t mIPCSerializableStreams;
|
|
uint32_t mCloneableStreams;
|
|
uint32_t mAsyncInputStreams;
|
|
uint32_t mInputStreamLengths;
|
|
uint32_t mAsyncInputStreamLengths;
|
|
};
|
|
|
|
NS_IMPL_ADDREF(nsMultiplexInputStream)
|
|
NS_IMPL_RELEASE(nsMultiplexInputStream)
|
|
|
|
NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
|
|
NS_MULTIPLEXINPUTSTREAM_CID)
|
|
|
|
NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
|
|
NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
|
|
NS_INTERFACE_MAP_ENTRY(nsIInputStream)
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
|
|
NS_INTERFACE_MAP_ENTRY(nsITellableStream)
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
|
|
IsIPCSerializable())
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream, IsCloneable())
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream, IsAsyncInputStream())
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
|
|
IsAsyncInputStream())
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamLength,
|
|
IsInputStreamLength())
|
|
NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStreamLength,
|
|
IsAsyncInputStreamLength())
|
|
NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
|
|
NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
|
|
NS_INTERFACE_MAP_END
|
|
|
|
NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream, nsIMultiplexInputStream,
|
|
nsIInputStream, nsISeekableStream,
|
|
nsITellableStream)
|
|
|
|
static nsresult AvailableMaybeSeek(nsMultiplexInputStream::StreamData& aStream,
|
|
uint64_t* aResult) {
|
|
nsresult rv = aStream.mBufferedStream->Available(aResult);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
// Blindly seek to the current position if Available() returns
|
|
// NS_BASE_STREAM_CLOSED.
|
|
// If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
|
|
// Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
|
|
if (aStream.mSeekableStream) {
|
|
nsresult rvSeek =
|
|
aStream.mSeekableStream->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
|
|
if (NS_SUCCEEDED(rvSeek)) {
|
|
rv = aStream.mBufferedStream->Available(aResult);
|
|
}
|
|
}
|
|
}
|
|
return rv;
|
|
}
|
|
|
|
nsMultiplexInputStream::nsMultiplexInputStream()
|
|
: mLock("nsMultiplexInputStream lock"),
|
|
mCurrentStream(0),
|
|
mStartedReadingCurrent(false),
|
|
mStatus(NS_OK),
|
|
mAsyncWaitFlags(0),
|
|
mAsyncWaitRequestedCount(0),
|
|
mSeekableStreams(0),
|
|
mIPCSerializableStreams(0),
|
|
mCloneableStreams(0),
|
|
mAsyncInputStreams(0),
|
|
mInputStreamLengths(0),
|
|
mAsyncInputStreamLengths(0) {}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::GetCount(uint32_t* aCount) {
|
|
MutexAutoLock lock(mLock);
|
|
*aCount = mStreams.Length();
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::AppendStream(nsIInputStream* aStream) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
StreamData* streamData = mStreams.AppendElement(fallible);
|
|
if (NS_WARN_IF(!streamData)) {
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
|
|
nsresult rv = streamData->Initialize(aStream);
|
|
NS_ENSURE_SUCCESS(rv, rv);
|
|
|
|
UpdateQIMap(*streamData);
|
|
|
|
if (mStatus == NS_BASE_STREAM_CLOSED) {
|
|
// We were closed, but now we have more data to read.
|
|
mStatus = NS_OK;
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (aIndex >= mStreams.Length()) {
|
|
return NS_ERROR_NOT_AVAILABLE;
|
|
}
|
|
|
|
StreamData& streamData = mStreams.ElementAt(aIndex);
|
|
nsCOMPtr<nsIInputStream> stream = streamData.mOriginalStream;
|
|
stream.forget(aResult);
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Close() {
|
|
nsTArray<nsCOMPtr<nsIInputStream>> streams;
|
|
|
|
// Let's take a copy of the streams becuase, calling close() it could trigger
|
|
// a nsIInputStreamCallback immediately and we don't want to create a deadlock
|
|
// with mutex.
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
uint32_t len = mStreams.Length();
|
|
for (uint32_t i = 0; i < len; ++i) {
|
|
if (NS_WARN_IF(
|
|
!streams.AppendElement(mStreams[i].mBufferedStream, fallible))) {
|
|
mStatus = NS_BASE_STREAM_CLOSED;
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
}
|
|
mStatus = NS_BASE_STREAM_CLOSED;
|
|
}
|
|
|
|
nsresult rv = NS_OK;
|
|
|
|
uint32_t len = streams.Length();
|
|
for (uint32_t i = 0; i < len; ++i) {
|
|
nsresult rv2 = streams[i]->Close();
|
|
// We still want to close all streams, but we should return an error
|
|
if (NS_FAILED(rv2)) {
|
|
rv = rv2;
|
|
}
|
|
}
|
|
|
|
return rv;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Available(uint64_t* aResult) {
|
|
*aResult = 0;
|
|
|
|
MutexAutoLock lock(mLock);
|
|
if (NS_FAILED(mStatus)) {
|
|
return mStatus;
|
|
}
|
|
|
|
uint64_t avail = 0;
|
|
nsresult rv = NS_BASE_STREAM_CLOSED;
|
|
|
|
uint32_t len = mStreams.Length();
|
|
for (uint32_t i = mCurrentStream; i < len; i++) {
|
|
uint64_t streamAvail;
|
|
rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
// If a stream is closed, we continue with the next one.
|
|
// If this is the current stream we move to the following stream.
|
|
if (mCurrentStream == i) {
|
|
++mCurrentStream;
|
|
}
|
|
|
|
// If this is the last stream, we want to return this error code.
|
|
continue;
|
|
}
|
|
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
mStatus = rv;
|
|
return mStatus;
|
|
}
|
|
|
|
// If the current stream is async, we have to return what we have so far
|
|
// without processing the following streams. This is needed because
|
|
// ::Available should return only what is currently available. In case of an
|
|
// nsIAsyncInputStream, we have to call AsyncWait() in order to read more.
|
|
if (mStreams[i].mAsyncStream) {
|
|
avail += streamAvail;
|
|
break;
|
|
}
|
|
|
|
if (streamAvail == 0) {
|
|
// Nothing to read for this stream. Let's move to the next one.
|
|
continue;
|
|
}
|
|
|
|
avail += streamAvail;
|
|
}
|
|
|
|
// We still have something to read. We don't want to return an error code yet.
|
|
if (avail) {
|
|
*aResult = avail;
|
|
return NS_OK;
|
|
}
|
|
|
|
// Let's propagate the last error message.
|
|
mStatus = rv;
|
|
return rv;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
|
|
MutexAutoLock lock(mLock);
|
|
// It is tempting to implement this method in terms of ReadSegments, but
|
|
// that would prevent this class from being used with streams that only
|
|
// implement Read (e.g., file streams).
|
|
|
|
*aResult = 0;
|
|
|
|
if (mStatus == NS_BASE_STREAM_CLOSED) {
|
|
return NS_OK;
|
|
}
|
|
if (NS_FAILED(mStatus)) {
|
|
return mStatus;
|
|
}
|
|
|
|
nsresult rv = NS_OK;
|
|
|
|
uint32_t len = mStreams.Length();
|
|
while (mCurrentStream < len && aCount) {
|
|
uint32_t read;
|
|
rv = mStreams[mCurrentStream].mBufferedStream->Read(aBuf, aCount, &read);
|
|
|
|
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
|
|
// (This is a bug in those stream implementations)
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
MOZ_ASSERT_UNREACHABLE(
|
|
"Input stream's Read method returned "
|
|
"NS_BASE_STREAM_CLOSED");
|
|
rv = NS_OK;
|
|
read = 0;
|
|
} else if (NS_FAILED(rv)) {
|
|
break;
|
|
}
|
|
|
|
if (read == 0) {
|
|
++mCurrentStream;
|
|
mStartedReadingCurrent = false;
|
|
} else {
|
|
NS_ASSERTION(aCount >= read, "Read more than requested");
|
|
*aResult += read;
|
|
aCount -= read;
|
|
aBuf += read;
|
|
mStartedReadingCurrent = true;
|
|
|
|
mStreams[mCurrentStream].mCurrentPos += read;
|
|
}
|
|
}
|
|
return *aResult ? NS_OK : rv;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
|
|
uint32_t aCount, uint32_t* aResult) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (mStatus == NS_BASE_STREAM_CLOSED) {
|
|
*aResult = 0;
|
|
return NS_OK;
|
|
}
|
|
if (NS_FAILED(mStatus)) {
|
|
return mStatus;
|
|
}
|
|
|
|
NS_ASSERTION(aWriter, "missing aWriter");
|
|
|
|
nsresult rv = NS_OK;
|
|
ReadSegmentsState state;
|
|
state.mThisStream = this;
|
|
state.mOffset = 0;
|
|
state.mWriter = aWriter;
|
|
state.mClosure = aClosure;
|
|
state.mDone = false;
|
|
|
|
uint32_t len = mStreams.Length();
|
|
while (mCurrentStream < len && aCount) {
|
|
uint32_t read;
|
|
rv = mStreams[mCurrentStream].mBufferedStream->ReadSegments(
|
|
ReadSegCb, &state, aCount, &read);
|
|
|
|
// XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
|
|
// (This is a bug in those stream implementations)
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
MOZ_ASSERT_UNREACHABLE(
|
|
"Input stream's Read method returned "
|
|
"NS_BASE_STREAM_CLOSED");
|
|
rv = NS_OK;
|
|
read = 0;
|
|
}
|
|
|
|
// if |aWriter| decided to stop reading segments...
|
|
if (state.mDone || NS_FAILED(rv)) {
|
|
break;
|
|
}
|
|
|
|
// if stream is empty, then advance to the next stream.
|
|
if (read == 0) {
|
|
++mCurrentStream;
|
|
mStartedReadingCurrent = false;
|
|
} else {
|
|
NS_ASSERTION(aCount >= read, "Read more than requested");
|
|
state.mOffset += read;
|
|
aCount -= read;
|
|
mStartedReadingCurrent = true;
|
|
|
|
mStreams[mCurrentStream].mCurrentPos += read;
|
|
}
|
|
}
|
|
|
|
// if we successfully read some data, then this call succeeded.
|
|
*aResult = state.mOffset;
|
|
return state.mOffset ? NS_OK : rv;
|
|
}
|
|
|
|
nsresult nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
|
|
const char* aFromRawSegment,
|
|
uint32_t aToOffset, uint32_t aCount,
|
|
uint32_t* aWriteCount) {
|
|
nsresult rv;
|
|
ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
|
|
rv = (state->mWriter)(state->mThisStream, state->mClosure, aFromRawSegment,
|
|
aToOffset + state->mOffset, aCount, aWriteCount);
|
|
if (NS_FAILED(rv)) {
|
|
state->mDone = true;
|
|
}
|
|
return rv;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
uint32_t len = mStreams.Length();
|
|
if (len == 0) {
|
|
// Claim to be non-blocking, since we won't block the caller.
|
|
*aNonBlocking = true;
|
|
return NS_OK;
|
|
}
|
|
|
|
for (uint32_t i = 0; i < len; ++i) {
|
|
nsresult rv = mStreams[i].mBufferedStream->IsNonBlocking(aNonBlocking);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
// If one is blocking the entire stream becomes blocking.
|
|
if (!*aNonBlocking) {
|
|
return NS_OK;
|
|
}
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (NS_FAILED(mStatus)) {
|
|
return mStatus;
|
|
}
|
|
|
|
nsresult rv;
|
|
|
|
uint32_t oldCurrentStream = mCurrentStream;
|
|
bool oldStartedReadingCurrent = mStartedReadingCurrent;
|
|
|
|
if (aWhence == NS_SEEK_SET) {
|
|
int64_t remaining = aOffset;
|
|
if (aOffset == 0) {
|
|
mCurrentStream = 0;
|
|
}
|
|
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
|
|
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
|
|
if (!stream) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
// See if all remaining streams should be rewound
|
|
if (remaining == 0) {
|
|
if (i < oldCurrentStream ||
|
|
(i == oldCurrentStream && oldStartedReadingCurrent)) {
|
|
rv = stream->Seek(NS_SEEK_SET, 0);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos = 0;
|
|
continue;
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// Get position in the current stream
|
|
int64_t streamPos;
|
|
if (i > oldCurrentStream ||
|
|
(i == oldCurrentStream && !oldStartedReadingCurrent)) {
|
|
streamPos = 0;
|
|
} else {
|
|
streamPos = mStreams[i].mCurrentPos;
|
|
}
|
|
|
|
// See if we need to seek the current stream forward or backward
|
|
if (remaining < streamPos) {
|
|
rv = stream->Seek(NS_SEEK_SET, remaining);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos = remaining;
|
|
mCurrentStream = i;
|
|
mStartedReadingCurrent = remaining != 0;
|
|
|
|
remaining = 0;
|
|
} else if (remaining > streamPos) {
|
|
if (i < oldCurrentStream) {
|
|
// We're already at end so no need to seek this stream
|
|
remaining -= streamPos;
|
|
NS_ASSERTION(remaining >= 0, "Remaining invalid");
|
|
} else {
|
|
uint64_t avail;
|
|
rv = AvailableMaybeSeek(mStreams[i], &avail);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
|
|
|
|
rv = stream->Seek(NS_SEEK_SET, newPos);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos = newPos;
|
|
mCurrentStream = i;
|
|
mStartedReadingCurrent = true;
|
|
|
|
remaining -= newPos;
|
|
NS_ASSERTION(remaining >= 0, "Remaining invalid");
|
|
}
|
|
} else {
|
|
NS_ASSERTION(remaining == streamPos, "Huh?");
|
|
MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
|
|
remaining = 0;
|
|
mCurrentStream = i;
|
|
mStartedReadingCurrent = true;
|
|
}
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
if (aWhence == NS_SEEK_CUR && aOffset > 0) {
|
|
int64_t remaining = aOffset;
|
|
for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
|
|
uint64_t avail;
|
|
rv = AvailableMaybeSeek(mStreams[i], &avail);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
|
|
|
|
rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, seek);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos += seek;
|
|
mCurrentStream = i;
|
|
mStartedReadingCurrent = true;
|
|
|
|
remaining -= seek;
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
if (aWhence == NS_SEEK_CUR && aOffset < 0) {
|
|
int64_t remaining = -aOffset;
|
|
for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
|
|
int64_t pos = mStreams[i].mCurrentPos;
|
|
|
|
int64_t seek = XPCOM_MIN(pos, remaining);
|
|
|
|
rv = mStreams[i].mSeekableStream->Seek(NS_SEEK_CUR, -seek);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos -= seek;
|
|
mCurrentStream = i;
|
|
mStartedReadingCurrent = seek != -pos;
|
|
|
|
remaining -= seek;
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
if (aWhence == NS_SEEK_CUR) {
|
|
NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
if (aWhence == NS_SEEK_END) {
|
|
if (aOffset > 0) {
|
|
return NS_ERROR_INVALID_ARG;
|
|
}
|
|
|
|
int64_t remaining = aOffset;
|
|
int32_t i;
|
|
for (i = mStreams.Length() - 1; i >= 0; --i) {
|
|
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
|
|
|
|
uint64_t avail;
|
|
rv = AvailableMaybeSeek(mStreams[i], &avail);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
int64_t streamLength = avail + mStreams[i].mCurrentPos;
|
|
|
|
// The seek(END) can be completed in the current stream.
|
|
if (streamLength >= DeprecatedAbs(remaining)) {
|
|
rv = stream->Seek(NS_SEEK_END, remaining);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos = streamLength + remaining;
|
|
mCurrentStream = i;
|
|
mStartedReadingCurrent = true;
|
|
break;
|
|
}
|
|
|
|
// We are at the beginning of this stream.
|
|
rv = stream->Seek(NS_SEEK_SET, 0);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
remaining += streamLength;
|
|
mStreams[i].mCurrentPos = 0;
|
|
}
|
|
|
|
// Any other stream must be set to the end.
|
|
for (--i; i >= 0; --i) {
|
|
nsCOMPtr<nsISeekableStream> stream = mStreams[i].mSeekableStream;
|
|
|
|
uint64_t avail;
|
|
rv = AvailableMaybeSeek(mStreams[i], &avail);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
int64_t streamLength = avail + mStreams[i].mCurrentPos;
|
|
|
|
rv = stream->Seek(NS_SEEK_END, 0);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mStreams[i].mCurrentPos = streamLength;
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
// other Seeks not implemented yet
|
|
return NS_ERROR_NOT_IMPLEMENTED;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Tell(int64_t* aResult) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (NS_FAILED(mStatus)) {
|
|
return mStatus;
|
|
}
|
|
|
|
int64_t ret64 = 0;
|
|
#ifdef DEBUG
|
|
bool zeroFound = false;
|
|
#endif
|
|
|
|
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
|
|
ret64 += mStreams[i].mCurrentPos;
|
|
|
|
#ifdef DEBUG
|
|
// When we see 1 stream with currentPos = 0, all the remaining streams must
|
|
// be set to 0 as well.
|
|
MOZ_ASSERT_IF(zeroFound, mStreams[i].mCurrentPos == 0);
|
|
if (mStreams[i].mCurrentPos == 0) {
|
|
zeroFound = true;
|
|
}
|
|
#endif
|
|
}
|
|
*aResult = ret64;
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::SetEOF() { return NS_ERROR_NOT_IMPLEMENTED; }
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::CloseWithStatus(nsresult aStatus) { return Close(); }
|
|
|
|
// This class is used to inform nsMultiplexInputStream that it's time to execute
|
|
// the asyncWait callback.
|
|
class AsyncWaitRunnable final : public DiscardableRunnable {
|
|
RefPtr<nsMultiplexInputStream> mStream;
|
|
|
|
public:
|
|
static void Create(nsMultiplexInputStream* aStream,
|
|
nsIEventTarget* aEventTarget) {
|
|
RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(aStream);
|
|
if (aEventTarget) {
|
|
aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
|
|
} else {
|
|
runnable->Run();
|
|
}
|
|
}
|
|
|
|
NS_IMETHOD
|
|
Run() override {
|
|
mStream->AsyncWaitCompleted();
|
|
return NS_OK;
|
|
}
|
|
|
|
private:
|
|
explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
|
|
: DiscardableRunnable("AsyncWaitRunnable"), mStream(aStream) {
|
|
MOZ_ASSERT(aStream);
|
|
}
|
|
};
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
|
|
uint32_t aFlags, uint32_t aRequestedCount,
|
|
nsIEventTarget* aEventTarget) {
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
|
|
// We must execute the callback also when the stream is closed.
|
|
if (NS_FAILED(mStatus) && mStatus != NS_BASE_STREAM_CLOSED) {
|
|
return mStatus;
|
|
}
|
|
|
|
if (mAsyncWaitCallback && aCallback) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
mAsyncWaitCallback = aCallback;
|
|
mAsyncWaitFlags = aFlags;
|
|
mAsyncWaitRequestedCount = aRequestedCount;
|
|
mAsyncWaitEventTarget = aEventTarget;
|
|
|
|
if (!mAsyncWaitCallback) {
|
|
return NS_OK;
|
|
}
|
|
}
|
|
|
|
return AsyncWaitInternal();
|
|
}
|
|
|
|
nsresult nsMultiplexInputStream::AsyncWaitInternal() {
|
|
nsCOMPtr<nsIAsyncInputStream> stream;
|
|
uint32_t asyncWaitFlags = 0;
|
|
uint32_t asyncWaitRequestedCount = 0;
|
|
nsCOMPtr<nsIEventTarget> asyncWaitEventTarget;
|
|
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
|
|
// Let's take the first async stream if we are not already closed, and if
|
|
// it has data to read or if it async.
|
|
if (mStatus != NS_BASE_STREAM_CLOSED) {
|
|
for (; mCurrentStream < mStreams.Length(); ++mCurrentStream) {
|
|
stream = mStreams[mCurrentStream].mAsyncStream;
|
|
if (stream) {
|
|
break;
|
|
}
|
|
|
|
uint64_t avail = 0;
|
|
nsresult rv = AvailableMaybeSeek(mStreams[mCurrentStream], &avail);
|
|
if (rv == NS_BASE_STREAM_CLOSED || (NS_SUCCEEDED(rv) && avail == 0)) {
|
|
// Nothing to read here. Let's move on.
|
|
continue;
|
|
}
|
|
|
|
if (NS_FAILED(rv)) {
|
|
return rv;
|
|
}
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
asyncWaitFlags = mAsyncWaitFlags;
|
|
asyncWaitRequestedCount = mAsyncWaitRequestedCount;
|
|
asyncWaitEventTarget = mAsyncWaitEventTarget;
|
|
}
|
|
|
|
MOZ_ASSERT_IF(stream, NS_SUCCEEDED(mStatus));
|
|
|
|
// If we are here it's because we are already closed, or if the current stream
|
|
// is not async. In both case we have to execute the callback.
|
|
if (!stream) {
|
|
AsyncWaitRunnable::Create(this, asyncWaitEventTarget);
|
|
return NS_OK;
|
|
}
|
|
|
|
return stream->AsyncWait(this, asyncWaitFlags, asyncWaitRequestedCount,
|
|
asyncWaitEventTarget);
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
|
|
nsCOMPtr<nsIInputStreamCallback> callback;
|
|
|
|
// When OnInputStreamReady is called, we could be in 2 scenarios:
|
|
// a. there is something to read;
|
|
// b. the stream is closed.
|
|
// But if the stream is closed and it was not the last one, we must proceed
|
|
// with the following stream in order to have something to read by the callee.
|
|
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
|
|
// The callback has been nullified in the meantime.
|
|
if (!mAsyncWaitCallback) {
|
|
return NS_OK;
|
|
}
|
|
|
|
if (NS_SUCCEEDED(mStatus)) {
|
|
uint64_t avail = 0;
|
|
nsresult rv = aStream->Available(&avail);
|
|
if (rv == NS_BASE_STREAM_CLOSED || avail == 0) {
|
|
// This stream is closed or empty, let's move to the following one.
|
|
++mCurrentStream;
|
|
MutexAutoUnlock unlock(mLock);
|
|
return AsyncWaitInternal();
|
|
}
|
|
}
|
|
|
|
mAsyncWaitCallback.swap(callback);
|
|
mAsyncWaitEventTarget = nullptr;
|
|
}
|
|
|
|
return callback->OnInputStreamReady(this);
|
|
}
|
|
|
|
void nsMultiplexInputStream::AsyncWaitCompleted() {
|
|
nsCOMPtr<nsIInputStreamCallback> callback;
|
|
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
|
|
// The callback has been nullified in the meantime.
|
|
if (!mAsyncWaitCallback) {
|
|
return;
|
|
}
|
|
|
|
mAsyncWaitCallback.swap(callback);
|
|
mAsyncWaitEventTarget = nullptr;
|
|
}
|
|
|
|
callback->OnInputStreamReady(this);
|
|
}
|
|
|
|
nsresult nsMultiplexInputStreamConstructor(nsISupports* aOuter, REFNSIID aIID,
|
|
void** aResult) {
|
|
*aResult = nullptr;
|
|
|
|
if (aOuter) {
|
|
return NS_ERROR_NO_AGGREGATION;
|
|
}
|
|
|
|
RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
|
|
|
|
return inst->QueryInterface(aIID, aResult);
|
|
}
|
|
|
|
void nsMultiplexInputStream::Serialize(
|
|
InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors,
|
|
bool aDelayedStart, uint32_t aMaxSize, uint32_t* aSizeUsed,
|
|
mozilla::ipc::ParentToChildStreamActorManager* aManager) {
|
|
SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
|
|
aSizeUsed, aManager);
|
|
}
|
|
|
|
void nsMultiplexInputStream::Serialize(
|
|
InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors,
|
|
bool aDelayedStart, uint32_t aMaxSize, uint32_t* aSizeUsed,
|
|
mozilla::ipc::ChildToParentStreamActorManager* aManager) {
|
|
SerializeInternal(aParams, aFileDescriptors, aDelayedStart, aMaxSize,
|
|
aSizeUsed, aManager);
|
|
}
|
|
|
|
template <typename M>
|
|
void nsMultiplexInputStream::SerializeInternal(
|
|
InputStreamParams& aParams, FileDescriptorArray& aFileDescriptors,
|
|
bool aDelayedStart, uint32_t aMaxSize, uint32_t* aSizeUsed, M* aManager) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
MultiplexInputStreamParams params;
|
|
|
|
CheckedUint32 totalSizeUsed = 0;
|
|
CheckedUint32 maxSize = aMaxSize;
|
|
|
|
uint32_t streamCount = mStreams.Length();
|
|
if (streamCount) {
|
|
nsTArray<InputStreamParams>& streams = params.streams();
|
|
|
|
streams.SetCapacity(streamCount);
|
|
for (uint32_t index = 0; index < streamCount; index++) {
|
|
uint32_t sizeUsed = 0;
|
|
InputStreamHelper::SerializeInputStream(
|
|
mStreams[index].mOriginalStream, *streams.AppendElement(),
|
|
aFileDescriptors, aDelayedStart, maxSize.value(), &sizeUsed,
|
|
aManager);
|
|
|
|
MOZ_ASSERT(maxSize.value() >= sizeUsed);
|
|
|
|
maxSize -= sizeUsed;
|
|
MOZ_DIAGNOSTIC_ASSERT(maxSize.isValid());
|
|
|
|
totalSizeUsed += sizeUsed;
|
|
MOZ_DIAGNOSTIC_ASSERT(totalSizeUsed.isValid());
|
|
}
|
|
}
|
|
|
|
params.currentStream() = mCurrentStream;
|
|
params.status() = mStatus;
|
|
params.startedReadingCurrent() = mStartedReadingCurrent;
|
|
|
|
aParams = std::move(params);
|
|
|
|
MOZ_ASSERT(aSizeUsed);
|
|
*aSizeUsed = totalSizeUsed.value();
|
|
}
|
|
|
|
bool nsMultiplexInputStream::Deserialize(
|
|
const InputStreamParams& aParams,
|
|
const FileDescriptorArray& aFileDescriptors) {
|
|
if (aParams.type() != InputStreamParams::TMultiplexInputStreamParams) {
|
|
NS_ERROR("Received unknown parameters from the other process!");
|
|
return false;
|
|
}
|
|
|
|
const MultiplexInputStreamParams& params =
|
|
aParams.get_MultiplexInputStreamParams();
|
|
|
|
const nsTArray<InputStreamParams>& streams = params.streams();
|
|
|
|
uint32_t streamCount = streams.Length();
|
|
for (uint32_t index = 0; index < streamCount; index++) {
|
|
nsCOMPtr<nsIInputStream> stream = InputStreamHelper::DeserializeInputStream(
|
|
streams[index], aFileDescriptors);
|
|
if (!stream) {
|
|
NS_WARNING("Deserialize failed!");
|
|
return false;
|
|
}
|
|
|
|
if (NS_FAILED(AppendStream(stream))) {
|
|
NS_WARNING("AppendStream failed!");
|
|
return false;
|
|
}
|
|
}
|
|
|
|
mCurrentStream = params.currentStream();
|
|
mStatus = params.status();
|
|
mStartedReadingCurrent = params.startedReadingCurrent();
|
|
|
|
return true;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::GetCloneable(bool* aCloneable) {
|
|
MutexAutoLock lock(mLock);
|
|
// XXXnsm Cloning a multiplex stream which has started reading is not
|
|
// permitted right now.
|
|
if (mCurrentStream > 0 || mStartedReadingCurrent) {
|
|
*aCloneable = false;
|
|
return NS_OK;
|
|
}
|
|
|
|
uint32_t len = mStreams.Length();
|
|
for (uint32_t i = 0; i < len; ++i) {
|
|
nsCOMPtr<nsICloneableInputStream> cis =
|
|
do_QueryInterface(mStreams[i].mBufferedStream);
|
|
if (!cis || !cis->GetCloneable()) {
|
|
*aCloneable = false;
|
|
return NS_OK;
|
|
}
|
|
}
|
|
|
|
*aCloneable = true;
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Clone(nsIInputStream** aClone) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
// XXXnsm Cloning a multiplex stream which has started reading is not
|
|
// permitted right now.
|
|
if (mCurrentStream > 0 || mStartedReadingCurrent) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
RefPtr<nsMultiplexInputStream> clone = new nsMultiplexInputStream();
|
|
|
|
nsresult rv;
|
|
uint32_t len = mStreams.Length();
|
|
for (uint32_t i = 0; i < len; ++i) {
|
|
nsCOMPtr<nsICloneableInputStream> substream =
|
|
do_QueryInterface(mStreams[i].mBufferedStream);
|
|
if (NS_WARN_IF(!substream)) {
|
|
return NS_ERROR_FAILURE;
|
|
}
|
|
|
|
nsCOMPtr<nsIInputStream> clonedSubstream;
|
|
rv = substream->Clone(getter_AddRefs(clonedSubstream));
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
rv = clone->AppendStream(clonedSubstream);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
}
|
|
|
|
clone.forget(aClone);
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::Length(int64_t* aLength) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (mCurrentStream > 0 || mStartedReadingCurrent) {
|
|
return NS_ERROR_NOT_AVAILABLE;
|
|
}
|
|
|
|
CheckedInt64 length = 0;
|
|
nsresult retval = NS_OK;
|
|
|
|
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
|
|
nsCOMPtr<nsIInputStreamLength> substream =
|
|
do_QueryInterface(mStreams[i].mBufferedStream);
|
|
if (!substream) {
|
|
// Let's use available as fallback.
|
|
uint64_t streamAvail = 0;
|
|
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
continue;
|
|
}
|
|
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
mStatus = rv;
|
|
return mStatus;
|
|
}
|
|
|
|
length += streamAvail;
|
|
if (!length.isValid()) {
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
|
|
continue;
|
|
}
|
|
|
|
int64_t size = 0;
|
|
nsresult rv = substream->Length(&size);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
continue;
|
|
}
|
|
|
|
if (rv == NS_ERROR_NOT_AVAILABLE) {
|
|
return rv;
|
|
}
|
|
|
|
// If one stream blocks, we all block.
|
|
if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
// We want to return WOULD_BLOCK if there is 1 stream that blocks. But want
|
|
// to see if there are other streams with length = -1.
|
|
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
|
|
retval = NS_BASE_STREAM_WOULD_BLOCK;
|
|
continue;
|
|
}
|
|
|
|
// If one of the stream doesn't know the size, we all don't know the size.
|
|
if (size == -1) {
|
|
*aLength = -1;
|
|
return NS_OK;
|
|
}
|
|
|
|
length += size;
|
|
if (!length.isValid()) {
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
}
|
|
|
|
*aLength = length.value();
|
|
return retval;
|
|
}
|
|
|
|
class nsMultiplexInputStream::AsyncWaitLengthHelper final
|
|
: public nsIInputStreamLengthCallback
|
|
|
|
{
|
|
public:
|
|
NS_DECL_ISUPPORTS
|
|
|
|
AsyncWaitLengthHelper()
|
|
: mStreamNotified(false), mLength(0), mNegativeSize(false) {}
|
|
|
|
bool AddStream(nsIAsyncInputStreamLength* aStream) {
|
|
return mPendingStreams.AppendElement(aStream, fallible);
|
|
}
|
|
|
|
bool AddSize(int64_t aSize) {
|
|
MOZ_ASSERT(!mNegativeSize);
|
|
|
|
mLength += aSize;
|
|
return mLength.isValid();
|
|
}
|
|
|
|
void NegativeSize() {
|
|
MOZ_ASSERT(!mNegativeSize);
|
|
mNegativeSize = true;
|
|
}
|
|
|
|
nsresult Proceed(nsMultiplexInputStream* aParentStream,
|
|
nsIEventTarget* aEventTarget,
|
|
const MutexAutoLock& aProofOfLock) {
|
|
MOZ_ASSERT(!mStream);
|
|
|
|
// If we don't need to wait, let's inform the callback immediately.
|
|
if (mPendingStreams.IsEmpty() || mNegativeSize) {
|
|
RefPtr<nsMultiplexInputStream> parentStream = aParentStream;
|
|
int64_t length = -1;
|
|
if (!mNegativeSize && mLength.isValid()) {
|
|
length = mLength.value();
|
|
}
|
|
nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
|
|
"AsyncWaitLengthHelper", [parentStream, length]() {
|
|
MutexAutoLock lock(parentStream->GetLock());
|
|
parentStream->AsyncWaitCompleted(length, lock);
|
|
});
|
|
return aEventTarget->Dispatch(r.forget(), NS_DISPATCH_NORMAL);
|
|
}
|
|
|
|
// Let's store the callback and the parent stream until we have
|
|
// notifications from the async length streams.
|
|
|
|
mStream = aParentStream;
|
|
|
|
// Let's activate all the pending streams.
|
|
for (nsIAsyncInputStreamLength* stream : mPendingStreams) {
|
|
nsresult rv = stream->AsyncLengthWait(this, aEventTarget);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
continue;
|
|
}
|
|
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
}
|
|
|
|
return NS_OK;
|
|
}
|
|
|
|
NS_IMETHOD
|
|
OnInputStreamLengthReady(nsIAsyncInputStreamLength* aStream,
|
|
int64_t aLength) override {
|
|
MutexAutoLock lock(mStream->GetLock());
|
|
|
|
MOZ_ASSERT(mPendingStreams.Contains(aStream));
|
|
mPendingStreams.RemoveElement(aStream);
|
|
|
|
// Already notified.
|
|
if (mStreamNotified) {
|
|
return NS_OK;
|
|
}
|
|
|
|
if (aLength == -1) {
|
|
mNegativeSize = true;
|
|
} else {
|
|
mLength += aLength;
|
|
if (!mLength.isValid()) {
|
|
mNegativeSize = true;
|
|
}
|
|
}
|
|
|
|
// We need to wait.
|
|
if (!mNegativeSize && !mPendingStreams.IsEmpty()) {
|
|
return NS_OK;
|
|
}
|
|
|
|
// Let's notify the parent stream.
|
|
mStreamNotified = true;
|
|
mStream->AsyncWaitCompleted(mNegativeSize ? -1 : mLength.value(), lock);
|
|
return NS_OK;
|
|
}
|
|
|
|
private:
|
|
~AsyncWaitLengthHelper() = default;
|
|
|
|
RefPtr<nsMultiplexInputStream> mStream;
|
|
bool mStreamNotified;
|
|
|
|
CheckedInt64 mLength;
|
|
bool mNegativeSize;
|
|
|
|
nsTArray<nsCOMPtr<nsIAsyncInputStreamLength>> mPendingStreams;
|
|
};
|
|
|
|
NS_IMPL_ISUPPORTS(nsMultiplexInputStream::AsyncWaitLengthHelper,
|
|
nsIInputStreamLengthCallback)
|
|
|
|
NS_IMETHODIMP
|
|
nsMultiplexInputStream::AsyncLengthWait(nsIInputStreamLengthCallback* aCallback,
|
|
nsIEventTarget* aEventTarget) {
|
|
if (NS_WARN_IF(!aEventTarget)) {
|
|
return NS_ERROR_NULL_POINTER;
|
|
}
|
|
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (mCurrentStream > 0 || mStartedReadingCurrent) {
|
|
return NS_ERROR_NOT_AVAILABLE;
|
|
}
|
|
|
|
if (!aCallback) {
|
|
mAsyncWaitLengthCallback = nullptr;
|
|
return NS_OK;
|
|
}
|
|
|
|
// We have a pending operation! Let's use this instead of creating a new one.
|
|
if (mAsyncWaitLengthHelper) {
|
|
mAsyncWaitLengthCallback = aCallback;
|
|
return NS_OK;
|
|
}
|
|
|
|
RefPtr<AsyncWaitLengthHelper> helper = new AsyncWaitLengthHelper();
|
|
|
|
for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
|
|
nsCOMPtr<nsIAsyncInputStreamLength> asyncStream =
|
|
do_QueryInterface(mStreams[i].mBufferedStream);
|
|
if (asyncStream) {
|
|
if (NS_WARN_IF(!helper->AddStream(asyncStream))) {
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
nsCOMPtr<nsIInputStreamLength> stream =
|
|
do_QueryInterface(mStreams[i].mBufferedStream);
|
|
if (!stream) {
|
|
// Let's use available as fallback.
|
|
uint64_t streamAvail = 0;
|
|
nsresult rv = AvailableMaybeSeek(mStreams[i], &streamAvail);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
continue;
|
|
}
|
|
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
mStatus = rv;
|
|
return mStatus;
|
|
}
|
|
|
|
if (NS_WARN_IF(!helper->AddSize(streamAvail))) {
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
|
|
continue;
|
|
}
|
|
|
|
int64_t size = 0;
|
|
nsresult rv = stream->Length(&size);
|
|
if (rv == NS_BASE_STREAM_CLOSED) {
|
|
continue;
|
|
}
|
|
|
|
MOZ_ASSERT(rv != NS_BASE_STREAM_WOULD_BLOCK,
|
|
"A nsILengthInutStream returns NS_BASE_STREAM_WOULD_BLOCK but "
|
|
"it doesn't implement nsIAsyncInputStreamLength.");
|
|
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
if (size == -1) {
|
|
helper->NegativeSize();
|
|
break;
|
|
}
|
|
|
|
if (NS_WARN_IF(!helper->AddSize(size))) {
|
|
return NS_ERROR_OUT_OF_MEMORY;
|
|
}
|
|
}
|
|
|
|
nsresult rv = helper->Proceed(this, aEventTarget, lock);
|
|
if (NS_WARN_IF(NS_FAILED(rv))) {
|
|
return rv;
|
|
}
|
|
|
|
mAsyncWaitLengthHelper = helper;
|
|
mAsyncWaitLengthCallback = aCallback;
|
|
return NS_OK;
|
|
}
|
|
|
|
void nsMultiplexInputStream::AsyncWaitCompleted(
|
|
int64_t aLength, const MutexAutoLock& aProofOfLock) {
|
|
nsCOMPtr<nsIInputStreamLengthCallback> callback;
|
|
callback.swap(mAsyncWaitLengthCallback);
|
|
|
|
mAsyncWaitLengthHelper = nullptr;
|
|
|
|
// Already canceled.
|
|
if (!callback) {
|
|
return;
|
|
}
|
|
|
|
MutexAutoUnlock unlock(mLock);
|
|
callback->OnInputStreamLengthReady(this, aLength);
|
|
}
|
|
|
|
#define MAYBE_UPDATE_VALUE_REAL(x, y) \
|
|
if (y) { \
|
|
++x; \
|
|
}
|
|
|
|
#define MAYBE_UPDATE_VALUE(x, y) \
|
|
{ \
|
|
nsCOMPtr<y> substream = do_QueryInterface(aStream.mBufferedStream); \
|
|
MAYBE_UPDATE_VALUE_REAL(x, substream) \
|
|
}
|
|
|
|
void nsMultiplexInputStream::UpdateQIMap(StreamData& aStream) {
|
|
MAYBE_UPDATE_VALUE_REAL(mSeekableStreams, aStream.mSeekableStream)
|
|
MAYBE_UPDATE_VALUE(mIPCSerializableStreams, nsIIPCSerializableInputStream)
|
|
MAYBE_UPDATE_VALUE(mCloneableStreams, nsICloneableInputStream)
|
|
MAYBE_UPDATE_VALUE_REAL(mAsyncInputStreams, aStream.mAsyncStream)
|
|
MAYBE_UPDATE_VALUE(mInputStreamLengths, nsIInputStreamLength)
|
|
MAYBE_UPDATE_VALUE(mAsyncInputStreamLengths, nsIAsyncInputStreamLength)
|
|
}
|
|
|
|
#undef MAYBE_UPDATE_VALUE
|
|
|
|
bool nsMultiplexInputStream::IsSeekable() const {
|
|
return mStreams.Length() == mSeekableStreams;
|
|
}
|
|
|
|
bool nsMultiplexInputStream::IsIPCSerializable() const {
|
|
return mStreams.Length() == mIPCSerializableStreams;
|
|
}
|
|
|
|
bool nsMultiplexInputStream::IsCloneable() const {
|
|
return mStreams.Length() == mCloneableStreams;
|
|
}
|
|
|
|
bool nsMultiplexInputStream::IsAsyncInputStream() const {
|
|
// nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
|
|
// substream implements that interface.
|
|
return !!mAsyncInputStreams;
|
|
}
|
|
|
|
bool nsMultiplexInputStream::IsInputStreamLength() const {
|
|
return !!mInputStreamLengths;
|
|
}
|
|
|
|
bool nsMultiplexInputStream::IsAsyncInputStreamLength() const {
|
|
return !!mAsyncInputStreamLengths;
|
|
}
|