зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1761548 - Add basic locking to nsStorageStream, r=xpcom-reviewers,mccr8
This should make it slightly safer to to try to read from a nsStorageInputStream while the stream is being concurrently written to on another thread, though it will still not behave very well as there is no nsIAsyncInputStream support. Individual input streams are still non-threadsafe with this change. Differential Revision: https://phabricator.services.mozilla.com/D142135
This commit is contained in:
Родитель
ebcaefe356
Коммит
789c8553d9
|
@ -12,6 +12,7 @@
|
|||
* with the attendant performance loss and heap fragmentation.
|
||||
*/
|
||||
|
||||
#include "mozilla/Mutex.h"
|
||||
#include "nsAlgorithm.h"
|
||||
#include "nsStorageStream.h"
|
||||
#include "nsSegmentedBuffer.h"
|
||||
|
@ -27,8 +28,7 @@
|
|||
#include "mozilla/MathAlgorithms.h"
|
||||
#include "mozilla/ipc/InputStreamUtils.h"
|
||||
|
||||
using mozilla::Maybe;
|
||||
using mozilla::Some;
|
||||
using mozilla::MutexAutoLock;
|
||||
using mozilla::ipc::InputStreamParams;
|
||||
using mozilla::ipc::StringInputStreamParams;
|
||||
|
||||
|
@ -49,15 +49,7 @@ static mozilla::LazyLogModule sStorageStreamLog("nsStorageStream");
|
|||
#endif
|
||||
#define LOG(args) MOZ_LOG(sStorageStreamLog, mozilla::LogLevel::Debug, args)
|
||||
|
||||
nsStorageStream::nsStorageStream()
|
||||
: mSegmentedBuffer(0),
|
||||
mSegmentSize(0),
|
||||
mSegmentSizeLog2(0),
|
||||
mWriteInProgress(false),
|
||||
mLastSegmentNum(-1),
|
||||
mWriteCursor(0),
|
||||
mSegmentEnd(0),
|
||||
mLogicalLength(0) {
|
||||
nsStorageStream::nsStorageStream() {
|
||||
LOG(("Creating nsStorageStream [%p].\n", this));
|
||||
}
|
||||
|
||||
|
@ -67,6 +59,7 @@ NS_IMPL_ISUPPORTS(nsStorageStream, nsIStorageStream, nsIOutputStream)
|
|||
|
||||
NS_IMETHODIMP
|
||||
nsStorageStream::Init(uint32_t aSegmentSize, uint32_t aMaxSize) {
|
||||
MutexAutoLock lock(mMutex);
|
||||
mSegmentedBuffer = new nsSegmentedBuffer();
|
||||
mSegmentSize = aSegmentSize;
|
||||
mSegmentSizeLog2 = mozilla::FloorLog2(aSegmentSize);
|
||||
|
@ -85,6 +78,8 @@ nsStorageStream::GetOutputStream(int32_t aStartingOffset,
|
|||
if (NS_WARN_IF(!aOutputStream)) {
|
||||
return NS_ERROR_INVALID_ARG;
|
||||
}
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
if (NS_WARN_IF(!mSegmentedBuffer)) {
|
||||
return NS_ERROR_NOT_INITIALIZED;
|
||||
}
|
||||
|
@ -93,6 +88,10 @@ nsStorageStream::GetOutputStream(int32_t aStartingOffset,
|
|||
return NS_ERROR_NOT_AVAILABLE;
|
||||
}
|
||||
|
||||
if (mActiveSegmentBorrows > 0) {
|
||||
return NS_ERROR_NOT_AVAILABLE;
|
||||
}
|
||||
|
||||
nsresult rv = Seek(aStartingOffset);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
|
@ -118,6 +117,7 @@ nsStorageStream::GetOutputStream(int32_t aStartingOffset,
|
|||
|
||||
NS_IMETHODIMP
|
||||
nsStorageStream::Close() {
|
||||
MutexAutoLock lock(mMutex);
|
||||
if (NS_WARN_IF(!mSegmentedBuffer)) {
|
||||
return NS_ERROR_NOT_INITIALIZED;
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ nsStorageStream::Close() {
|
|||
|
||||
// Shrink the final segment in the segmented buffer to the minimum size
|
||||
// needed to contain the data, so as to conserve memory.
|
||||
if (segmentOffset) {
|
||||
if (segmentOffset && !mActiveSegmentBorrows) {
|
||||
mSegmentedBuffer->ReallocLastSegment(segmentOffset);
|
||||
}
|
||||
|
||||
|
@ -150,19 +150,29 @@ nsStorageStream::Write(const char* aBuffer, uint32_t aCount,
|
|||
if (NS_WARN_IF(!aNumWritten) || NS_WARN_IF(!aBuffer)) {
|
||||
return NS_ERROR_INVALID_ARG;
|
||||
}
|
||||
|
||||
MutexAutoLock lock(mMutex);
|
||||
if (NS_WARN_IF(!mSegmentedBuffer)) {
|
||||
return NS_ERROR_NOT_INITIALIZED;
|
||||
}
|
||||
|
||||
const char* readCursor;
|
||||
uint32_t count, availableInSegment, remaining;
|
||||
nsresult rv = NS_OK;
|
||||
|
||||
LOG(("nsStorageStream [%p] Write mWriteCursor=%p mSegmentEnd=%p aCount=%d\n",
|
||||
this, mWriteCursor, mSegmentEnd, aCount));
|
||||
|
||||
remaining = aCount;
|
||||
readCursor = aBuffer;
|
||||
uint32_t remaining = aCount;
|
||||
const char* readCursor = aBuffer;
|
||||
|
||||
auto onExit = mozilla::MakeScopeExit([&] {
|
||||
mMutex.AssertCurrentThreadOwns();
|
||||
*aNumWritten = aCount - remaining;
|
||||
mLogicalLength += *aNumWritten;
|
||||
|
||||
LOG(
|
||||
("nsStorageStream [%p] Wrote mWriteCursor=%p mSegmentEnd=%p "
|
||||
"numWritten=%d\n",
|
||||
this, mWriteCursor, mSegmentEnd, *aNumWritten));
|
||||
});
|
||||
|
||||
// If no segments have been created yet, create one even if we don't have
|
||||
// to write any data; this enables creating an input stream which reads from
|
||||
// the very end of the data for any amount of data in the stream (i.e.
|
||||
|
@ -172,13 +182,12 @@ nsStorageStream::Write(const char* aBuffer, uint32_t aCount,
|
|||
bool firstTime = mSegmentedBuffer->GetSegmentCount() == 0;
|
||||
while (remaining || MOZ_UNLIKELY(firstTime)) {
|
||||
firstTime = false;
|
||||
availableInSegment = mSegmentEnd - mWriteCursor;
|
||||
uint32_t availableInSegment = mSegmentEnd - mWriteCursor;
|
||||
if (!availableInSegment) {
|
||||
mWriteCursor = mSegmentedBuffer->AppendNewSegment();
|
||||
if (!mWriteCursor) {
|
||||
mSegmentEnd = 0;
|
||||
rv = NS_ERROR_OUT_OF_MEMORY;
|
||||
goto out;
|
||||
return NS_ERROR_OUT_OF_MEMORY;
|
||||
}
|
||||
mLastSegmentNum++;
|
||||
mSegmentEnd = mWriteCursor + mSegmentSize;
|
||||
|
@ -189,7 +198,7 @@ nsStorageStream::Write(const char* aBuffer, uint32_t aCount,
|
|||
this, mWriteCursor, mSegmentEnd));
|
||||
}
|
||||
|
||||
count = XPCOM_MIN(availableInSegment, remaining);
|
||||
uint32_t count = XPCOM_MIN(availableInSegment, remaining);
|
||||
memcpy(mWriteCursor, readCursor, count);
|
||||
remaining -= count;
|
||||
readCursor += count;
|
||||
|
@ -200,15 +209,7 @@ nsStorageStream::Write(const char* aBuffer, uint32_t aCount,
|
|||
this, mWriteCursor, mSegmentEnd, count));
|
||||
}
|
||||
|
||||
out:
|
||||
*aNumWritten = aCount - remaining;
|
||||
mLogicalLength += *aNumWritten;
|
||||
|
||||
LOG(
|
||||
("nsStorageStream [%p] Wrote mWriteCursor=%p mSegmentEnd=%p "
|
||||
"numWritten=%d\n",
|
||||
this, mWriteCursor, mSegmentEnd, *aNumWritten));
|
||||
return rv;
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
NS_IMETHODIMP
|
||||
|
@ -231,13 +232,19 @@ nsStorageStream::IsNonBlocking(bool* aNonBlocking) {
|
|||
|
||||
NS_IMETHODIMP
|
||||
nsStorageStream::GetLength(uint32_t* aLength) {
|
||||
MutexAutoLock lock(mMutex);
|
||||
*aLength = mLogicalLength;
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
// Truncate the buffer by deleting the end segments
|
||||
NS_IMETHODIMP
|
||||
nsStorageStream::SetLength(uint32_t aLength) {
|
||||
MutexAutoLock lock(mMutex);
|
||||
return SetLengthLocked(aLength);
|
||||
}
|
||||
|
||||
// Truncate the buffer by deleting the end segments
|
||||
nsresult nsStorageStream::SetLengthLocked(uint32_t aLength) {
|
||||
if (NS_WARN_IF(!mSegmentedBuffer)) {
|
||||
return NS_ERROR_NOT_INITIALIZED;
|
||||
}
|
||||
|
@ -246,6 +253,10 @@ nsStorageStream::SetLength(uint32_t aLength) {
|
|||
return NS_ERROR_NOT_AVAILABLE;
|
||||
}
|
||||
|
||||
if (mActiveSegmentBorrows) {
|
||||
return NS_ERROR_NOT_AVAILABLE;
|
||||
}
|
||||
|
||||
if (aLength > mLogicalLength) {
|
||||
return NS_ERROR_INVALID_ARG;
|
||||
}
|
||||
|
@ -267,6 +278,7 @@ nsStorageStream::SetLength(uint32_t aLength) {
|
|||
|
||||
NS_IMETHODIMP
|
||||
nsStorageStream::GetWriteInProgress(bool* aWriteInProgress) {
|
||||
MutexAutoLock lock(mMutex);
|
||||
*aWriteInProgress = mWriteInProgress;
|
||||
return NS_OK;
|
||||
}
|
||||
|
@ -287,7 +299,7 @@ nsresult nsStorageStream::Seek(int32_t aPosition) {
|
|||
}
|
||||
|
||||
// Seeking backwards in the write stream results in truncation
|
||||
SetLength(aPosition);
|
||||
SetLengthLocked(aPosition);
|
||||
|
||||
// Special handling for seek to start-of-buffer
|
||||
if (aPosition == 0) {
|
||||
|
@ -346,7 +358,7 @@ class nsStorageInputStream final : public nsIInputStream,
|
|||
~nsStorageInputStream() = default;
|
||||
|
||||
protected:
|
||||
nsresult Seek(uint32_t aPosition);
|
||||
nsresult Seek(uint32_t aPosition) REQUIRES(mStorageStream->mMutex);
|
||||
|
||||
friend class nsStorageStream;
|
||||
|
||||
|
@ -359,7 +371,7 @@ class nsStorageInputStream final : public nsIInputStream,
|
|||
uint32_t mLogicalCursor; // Logical offset into stream
|
||||
nsresult mStatus;
|
||||
|
||||
uint32_t SegNum(uint32_t aPosition) {
|
||||
uint32_t SegNum(uint32_t aPosition) REQUIRES(mStorageStream->mMutex) {
|
||||
return aPosition >> mStorageStream->mSegmentSizeLog2;
|
||||
}
|
||||
uint32_t SegOffset(uint32_t aPosition) {
|
||||
|
@ -378,6 +390,7 @@ NS_IMPL_ISUPPORTS(nsStorageInputStream, nsIInputStream, nsISeekableStream,
|
|||
NS_IMETHODIMP
|
||||
nsStorageStream::NewInputStream(int32_t aStartingOffset,
|
||||
nsIInputStream** aInputStream) {
|
||||
MutexAutoLock lock(mMutex);
|
||||
if (NS_WARN_IF(!mSegmentedBuffer)) {
|
||||
return NS_ERROR_NOT_INITIALIZED;
|
||||
}
|
||||
|
@ -385,6 +398,7 @@ nsStorageStream::NewInputStream(int32_t aStartingOffset,
|
|||
RefPtr<nsStorageInputStream> inputStream =
|
||||
new nsStorageInputStream(this, mSegmentSize);
|
||||
|
||||
inputStream->mStorageStream->mMutex.AssertCurrentThreadOwns();
|
||||
nsresult rv = inputStream->Seek(aStartingOffset);
|
||||
if (NS_FAILED(rv)) {
|
||||
return rv;
|
||||
|
@ -406,6 +420,7 @@ nsStorageInputStream::Available(uint64_t* aAvailable) {
|
|||
return mStatus;
|
||||
}
|
||||
|
||||
MutexAutoLock lock(mStorageStream->mMutex);
|
||||
*aAvailable = mStorageStream->mLogicalLength - mLogicalCursor;
|
||||
return NS_OK;
|
||||
}
|
||||
|
@ -431,27 +446,36 @@ nsStorageInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
|
|||
|
||||
remainingCapacity = aCount;
|
||||
while (remainingCapacity) {
|
||||
availableInSegment = mSegmentEnd - mReadCursor;
|
||||
if (!availableInSegment) {
|
||||
uint32_t available = mStorageStream->mLogicalLength - mLogicalCursor;
|
||||
if (!available) {
|
||||
goto out;
|
||||
}
|
||||
const char* cur = nullptr;
|
||||
{
|
||||
MutexAutoLock lock(mStorageStream->mMutex);
|
||||
availableInSegment = mSegmentEnd - mReadCursor;
|
||||
if (!availableInSegment) {
|
||||
uint32_t available = mStorageStream->mLogicalLength - mLogicalCursor;
|
||||
if (!available) {
|
||||
break;
|
||||
}
|
||||
|
||||
// We have data in the stream, but if mSegmentEnd is zero, then we
|
||||
// were likely constructed prior to any data being written into
|
||||
// the stream. Therefore, if mSegmentEnd is non-zero, we should
|
||||
// move into the next segment; otherwise, we should stay in this
|
||||
// segment so our input state can be updated and we can properly
|
||||
// perform the initial read.
|
||||
if (mSegmentEnd > 0) {
|
||||
mSegmentNum++;
|
||||
// We have data in the stream, but if mSegmentEnd is zero, then we
|
||||
// were likely constructed prior to any data being written into
|
||||
// the stream. Therefore, if mSegmentEnd is non-zero, we should
|
||||
// move into the next segment; otherwise, we should stay in this
|
||||
// segment so our input state can be updated and we can properly
|
||||
// perform the initial read.
|
||||
if (mSegmentEnd > 0) {
|
||||
mSegmentNum++;
|
||||
}
|
||||
mReadCursor = 0;
|
||||
mSegmentEnd = XPCOM_MIN(mSegmentSize, available);
|
||||
availableInSegment = mSegmentEnd;
|
||||
}
|
||||
mReadCursor = 0;
|
||||
mSegmentEnd = XPCOM_MIN(mSegmentSize, available);
|
||||
availableInSegment = mSegmentEnd;
|
||||
cur = mStorageStream->mSegmentedBuffer->GetSegment(mSegmentNum);
|
||||
mStorageStream->mActiveSegmentBorrows++;
|
||||
}
|
||||
const char* cur = mStorageStream->mSegmentedBuffer->GetSegment(mSegmentNum);
|
||||
auto dropBorrow = mozilla::MakeScopeExit([&] {
|
||||
MutexAutoLock lock(mStorageStream->mMutex);
|
||||
mStorageStream->mActiveSegmentBorrows--;
|
||||
});
|
||||
|
||||
count = XPCOM_MIN(availableInSegment, remainingCapacity);
|
||||
rv = aWriter(this, aClosure, cur + mReadCursor, aCount - remainingCapacity,
|
||||
|
@ -464,7 +488,6 @@ nsStorageInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
|
|||
mLogicalCursor += bytesConsumed;
|
||||
}
|
||||
|
||||
out:
|
||||
*aNumRead = aCount - remainingCapacity;
|
||||
|
||||
bool isWriteInProgress = false;
|
||||
|
@ -494,6 +517,7 @@ nsStorageInputStream::Seek(int32_t aWhence, int64_t aOffset) {
|
|||
return mStatus;
|
||||
}
|
||||
|
||||
MutexAutoLock lock(mStorageStream->mMutex);
|
||||
int64_t pos = aOffset;
|
||||
|
||||
switch (aWhence) {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "nsIOutputStream.h"
|
||||
#include "nsMemory.h"
|
||||
#include "mozilla/Attributes.h"
|
||||
#include "mozilla/Mutex.h"
|
||||
|
||||
#define NS_STORAGESTREAM_CID \
|
||||
{ /* 669a9795-6ff7-4ed4-9150-c34ce2971b63 */ \
|
||||
|
@ -44,21 +45,32 @@ class nsStorageStream final : public nsIStorageStream, public nsIOutputStream {
|
|||
private:
|
||||
~nsStorageStream();
|
||||
|
||||
nsSegmentedBuffer* mSegmentedBuffer;
|
||||
uint32_t
|
||||
mSegmentSize; // All segments, except possibly the last, are of this size
|
||||
// Must be power-of-2
|
||||
uint32_t mSegmentSizeLog2; // log2(mSegmentSize)
|
||||
bool mWriteInProgress; // true, if an un-Close'ed output stream exists
|
||||
int32_t mLastSegmentNum; // Last segment # in use, -1 initially
|
||||
char* mWriteCursor; // Pointer to next byte to be written
|
||||
char* mSegmentEnd; // Pointer to one byte after end of segment
|
||||
// containing the write cursor
|
||||
uint32_t mLogicalLength; // Number of bytes written to stream
|
||||
mozilla::Mutex mMutex{"nsStorageStream"};
|
||||
nsSegmentedBuffer* mSegmentedBuffer GUARDED_BY(mMutex) = nullptr;
|
||||
// All segments, except possibly the last, are of this size. Must be
|
||||
// power-of-2
|
||||
uint32_t mSegmentSize GUARDED_BY(mMutex) = 0;
|
||||
// log2(mSegmentSize)
|
||||
uint32_t mSegmentSizeLog2 GUARDED_BY(mMutex) = 0;
|
||||
// true, if an un-Close'ed output stream exists
|
||||
bool mWriteInProgress GUARDED_BY(mMutex) = false;
|
||||
// Last segment # in use, -1 initially
|
||||
int32_t mLastSegmentNum GUARDED_BY(mMutex) = -1;
|
||||
// Pointer to next byte to be written
|
||||
char* mWriteCursor GUARDED_BY(mMutex) = nullptr;
|
||||
// Pointer to one byte after end of segment containing the write cursor
|
||||
char* mSegmentEnd GUARDED_BY(mMutex) = nullptr;
|
||||
// Number of bytes written to stream
|
||||
uint32_t mLogicalLength GUARDED_BY(mMutex) = 0;
|
||||
// number of input streams actively reading a segment.
|
||||
uint32_t mActiveSegmentBorrows GUARDED_BY(mMutex) = 0;
|
||||
|
||||
nsresult Seek(int32_t aPosition);
|
||||
uint32_t SegNum(uint32_t aPosition) { return aPosition >> mSegmentSizeLog2; }
|
||||
uint32_t SegOffset(uint32_t aPosition) {
|
||||
nsresult SetLengthLocked(uint32_t aLength) REQUIRES(mMutex);
|
||||
nsresult Seek(int32_t aPosition) REQUIRES(mMutex);
|
||||
uint32_t SegNum(uint32_t aPosition) REQUIRES(mMutex) {
|
||||
return aPosition >> mSegmentSizeLog2;
|
||||
}
|
||||
uint32_t SegOffset(uint32_t aPosition) REQUIRES(mMutex) {
|
||||
return aPosition & (mSegmentSize - 1);
|
||||
}
|
||||
};
|
||||
|
|
Загрузка…
Ссылка в новой задаче