Bug 1767517 - Part 2: Annotate nsPipe for the thread safety analysis, r=jesup,xpcom-reviewers,KrisWright

Differential Revision: https://phabricator.services.mozilla.com/D145394
This commit is contained in:
Nika Layzell 2022-05-16 20:40:49 +00:00
Родитель ae118208e9
Коммит 59f3f40567
1 изменённых файлов: 89 добавлений и 42 удалений

Просмотреть файл

@ -159,18 +159,22 @@ struct nsPipeReadState {
mActiveRead(false),
mNeedDrain(false) {}
char* mReadCursor;
char* mReadLimit;
int32_t mSegment;
uint32_t mAvailable;
// All members of this type are guarded by the pipe monitor, however it cannot
// be named from this type, so the less-reliable GUARDED_VAR is used instead.
// In the future it would be nice to avoid this, especially as GUARDED_VAR is
// deprecated.
char* mReadCursor GUARDED_VAR;
char* mReadLimit GUARDED_VAR;
int32_t mSegment GUARDED_VAR;
uint32_t mAvailable GUARDED_VAR;
// This flag is managed using the AutoReadSegment RAII stack class.
bool mActiveRead;
bool mActiveRead GUARDED_VAR;
// Set to indicate that the input stream has closed and should be drained,
// but that drain has been delayed due to an active read. When the read
// completes, this flag indicate the drain should then be performed.
bool mNeedDrain;
bool mNeedDrain GUARDED_VAR;
};
//-----------------------------------------------------------------------------
@ -211,10 +215,9 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
mReadState(aOther.mReadState),
mPriority(nsIRunnablePriority::PRIORITY_NORMAL) {}
nsresult Fill();
void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
uint32_t Available();
uint32_t Available() REQUIRES(Monitor());
// synchronously wait for the pipe to become readable.
nsresult Wait();
@ -223,9 +226,11 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
// expect their caller to have done so and to pass the monitor as
// evidence.
MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
const ReentrantMonitorAutoEnter& ev);
const ReentrantMonitorAutoEnter& ev)
REQUIRES(Monitor());
MonitorAction OnInputException(nsresult, nsPipeEvents&,
const ReentrantMonitorAutoEnter& ev);
const ReentrantMonitorAutoEnter& ev)
REQUIRES(Monitor());
nsPipeReadState& ReadState() { return mReadState; }
@ -234,15 +239,19 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
nsresult Status() const;
// A version of Status() that doesn't acquire the monitor.
nsresult Status(const ReentrantMonitorAutoEnter& ev) const;
nsresult Status(const ReentrantMonitorAutoEnter& ev) const
REQUIRES(Monitor());
// The status of this input stream, ignoring the status of the underlying
// monitor. If this status is errored, the input stream has either already
// been removed from the pipe, or will be removed from the pipe shortly.
nsresult InputStatus(const ReentrantMonitorAutoEnter&) const {
nsresult InputStatus(const ReentrantMonitorAutoEnter&) const
REQUIRES(Monitor()) {
return mInputStatus;
}
ReentrantMonitor& Monitor() const;
private:
virtual ~nsPipeInputStream();
@ -252,14 +261,15 @@ class nsPipeInputStream final : public nsIAsyncInputStream,
// Individual input streams can be closed without effecting the rest of the
// pipe. So track individual input stream status separately. |mInputStatus|
// is protected by |mPipe->mReentrantMonitor|.
nsresult mInputStatus;
nsresult mInputStatus GUARDED_BY(Monitor());
bool mBlocking;
// these variables can only be accessed while inside the pipe's monitor
bool mBlocked;
CallbackHolder mCallback;
bool mBlocked GUARDED_BY(Monitor());
CallbackHolder mCallback GUARDED_BY(Monitor());
// requires pipe's monitor; usually treat as an opaque token to pass to nsPipe
// requires pipe's monitor to access members; usually treat as an opaque token
// to pass to nsPipe
nsPipeReadState mReadState;
Atomic<uint32_t, Relaxed> mPriority;
};
@ -289,13 +299,17 @@ class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
mWritable(true) {}
void SetNonBlocking(bool aNonBlocking) { mBlocking = !aNonBlocking; }
void SetWritable(bool aWritable) { mWritable = aWritable; }
void SetWritable(bool aWritable) REQUIRES(Monitor()) {
mWritable = aWritable;
}
// synchronously wait for the pipe to become writable.
nsresult Wait();
MonitorAction OnOutputWritable(nsPipeEvents&);
MonitorAction OnOutputException(nsresult, nsPipeEvents&);
MonitorAction OnOutputWritable(nsPipeEvents&) REQUIRES(Monitor());
MonitorAction OnOutputException(nsresult, nsPipeEvents&) REQUIRES(Monitor());
ReentrantMonitor& Monitor() const;
private:
nsPipe* mPipe;
@ -306,9 +320,9 @@ class nsPipeOutputStream : public nsIAsyncOutputStream, public nsIClassInfo {
bool mBlocking;
// these variables can only be accessed while inside the pipe's monitor
bool mBlocked;
bool mWritable;
CallbackHolder mCallback;
bool mBlocked GUARDED_BY(Monitor());
bool mWritable GUARDED_BY(Monitor());
CallbackHolder mCallback GUARDED_BY(Monitor());
};
//-----------------------------------------------------------------------------
@ -336,19 +350,23 @@ class nsPipe final {
//
void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
char*& aCursor, char*& aLimit);
char*& aCursor, char*& aLimit) REQUIRES(mReentrantMonitor);
SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
const ReentrantMonitorAutoEnter& ev);
bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
uint32_t CountSegmentReferences(int32_t aSegment);
void SetAllNullReadCursors();
bool AllReadCursorsMatchWriteCursor();
void RollBackAllReadCursors(char* aWriteCursor);
void UpdateAllReadCursors(char* aWriteCursor);
void ValidateAllReadCursors();
const ReentrantMonitorAutoEnter& ev)
REQUIRES(mReentrantMonitor);
bool ReadSegmentBeingWritten(nsPipeReadState& aReadState)
REQUIRES(mReentrantMonitor);
uint32_t CountSegmentReferences(int32_t aSegment) REQUIRES(mReentrantMonitor);
void SetAllNullReadCursors() REQUIRES(mReentrantMonitor);
bool AllReadCursorsMatchWriteCursor() REQUIRES(mReentrantMonitor);
void RollBackAllReadCursors(char* aWriteCursor) REQUIRES(mReentrantMonitor);
void UpdateAllReadCursors(char* aWriteCursor) REQUIRES(mReentrantMonitor);
void ValidateAllReadCursors() REQUIRES(mReentrantMonitor);
uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
const ReentrantMonitorAutoEnter& ev) const;
bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;
const ReentrantMonitorAutoEnter& ev) const
REQUIRES(mReentrantMonitor);
bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
REQUIRES(mReentrantMonitor);
//
// methods below may be called while outside the pipe's monitor
@ -379,26 +397,44 @@ class nsPipe final {
// a weak reference as the streams will clear their entry here in their
// destructor. Using a strong reference would create a reference cycle.
// Only usable while mReentrantMonitor is locked.
nsTArray<nsPipeInputStream*> mInputList;
nsTArray<nsPipeInputStream*> mInputList GUARDED_BY(mReentrantMonitor);
ReentrantMonitor mReentrantMonitor MOZ_UNANNOTATED;
nsSegmentedBuffer mBuffer;
ReentrantMonitor mReentrantMonitor;
nsSegmentedBuffer mBuffer GUARDED_BY(mReentrantMonitor);
// The maximum number of segments to allow to be buffered in advance
// of the fastest reader. This is collection of segments is called
// the "advance buffer".
uint32_t mMaxAdvanceBufferSegmentCount;
uint32_t mMaxAdvanceBufferSegmentCount GUARDED_BY(mReentrantMonitor);
int32_t mWriteSegment;
char* mWriteCursor;
char* mWriteLimit;
int32_t mWriteSegment GUARDED_BY(mReentrantMonitor);
char* mWriteCursor GUARDED_BY(mReentrantMonitor);
char* mWriteLimit GUARDED_BY(mReentrantMonitor);
// |mStatus| is protected by |mReentrantMonitor|.
nsresult mStatus;
nsresult mStatus GUARDED_BY(mReentrantMonitor);
};
//-----------------------------------------------------------------------------
// Declarations of Monitor() methods on the streams.
//
// These must be placed early to provide RETURN_CAPABILITY annotations for the
// thread-safety analysis. This couldn't be done at the declaration due to
// nsPipe not yet being defined.
ReentrantMonitor& nsPipeOutputStream::Monitor() const
RETURN_CAPABILITY(mPipe->mReentrantMonitor) {
return mPipe->mReentrantMonitor;
}
ReentrantMonitor& nsPipeInputStream::Monitor() const
RETURN_CAPABILITY(mPipe->mReentrantMonitor) {
return mPipe->mReentrantMonitor;
}
//-----------------------------------------------------------------------------
// RAII class representing an active read segment. When it goes out of scope
// it automatically updates the read cursor and releases the read segment.
class MOZ_STACK_CLASS AutoReadSegment final {
@ -626,6 +662,7 @@ void nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState,
!ReadSegmentBeingWritten(aReadState)) {
// Advance the segment position. If we have read any segments from the
// advance buffer then we can potentially notify blocked writers.
mOutput.Monitor().AssertCurrentThreadIn();
if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
mOutput.OnOutputWritable(events) == NotifyMonitor) {
mon.NotifyAll();
@ -752,6 +789,7 @@ void nsPipe::DrainInputStream(nsPipeReadState& aReadState,
// If we have read any segments from the advance buffer then we can
// potentially notify blocked writers.
mOutput.Monitor().AssertCurrentThreadIn();
if (!IsAdvanceBufferFull(mon) &&
mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
mon.NotifyAll();
@ -834,12 +872,14 @@ void nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) {
// update the writable flag on the output stream
if (mWriteCursor == mWriteLimit) {
mOutput.Monitor().AssertCurrentThreadIn();
mOutput.SetWritable(!IsAdvanceBufferFull(mon));
}
// notify input stream that pipe now contains additional data
bool needNotify = false;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
mInputList[i]->Monitor().AssertCurrentThreadIn();
if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon) ==
NotifyMonitor) {
needNotify = true;
@ -881,6 +921,7 @@ void nsPipe::OnInputStreamException(nsPipeInputStream* aStream,
continue;
}
mInputList[i]->Monitor().AssertCurrentThreadIn();
MonitorAction action =
mInputList[i]->OnInputException(aReason, events, mon);
@ -917,6 +958,7 @@ void nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) {
for (uint32_t i = 0; i < list.Length(); ++i) {
// an output-only exception applies to the input end if the pipe has
// zero bytes available.
list[i]->Monitor().AssertCurrentThreadIn();
if (aOutputOnly && list[i]->Available()) {
continue;
}
@ -926,6 +968,7 @@ void nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) {
}
}
mOutput.Monitor().AssertCurrentThreadIn();
if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
needNotify = true;
}
@ -942,6 +985,7 @@ nsresult nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
ReentrantMonitorAutoEnter mon(mReentrantMonitor);
RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
// don't add clones of closed pipes to mInputList.
ref->Monitor().AssertCurrentThreadIn();
if (NS_SUCCEEDED(ref->InputStatus(mon))) {
mInputList.AppendElement(ref);
}
@ -1074,6 +1118,7 @@ bool nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const {
uint32_t minBufferSegments = UINT32_MAX;
for (uint32_t i = 0; i < mInputList.Length(); ++i) {
// Only count buffer segments from input streams that are open.
mInputList[i]->Monitor().AssertCurrentThreadIn();
if (NS_FAILED(mInputList[i]->Status(ev))) {
continue;
}
@ -1750,10 +1795,12 @@ nsresult NS_NewPipe(nsIInputStream** aPipeIn, nsIOutputStream** aPipeOut,
return NS_OK;
}
// Disable thread safety analysis as this is logically a constructor, and no
// additional threads can observe these objects yet.
nsresult NS_NewPipe2(nsIAsyncInputStream** aPipeIn,
nsIAsyncOutputStream** aPipeOut, bool aNonBlockingInput,
bool aNonBlockingOutput, uint32_t aSegmentSize,
uint32_t aSegmentCount) {
uint32_t aSegmentCount) NO_THREAD_SAFETY_ANALYSIS {
RefPtr<nsPipe> pipe =
new nsPipe(aSegmentSize ? aSegmentSize : DEFAULT_SEGMENT_SIZE,
aSegmentCount ? aSegmentCount : DEFAULT_SEGMENT_COUNT);