diff --git a/xpcom/io/nsPipe3.cpp b/xpcom/io/nsPipe3.cpp index 4ba97158e2e8..cda8f2dbbe3c 100644 --- a/xpcom/io/nsPipe3.cpp +++ b/xpcom/io/nsPipe3.cpp @@ -54,6 +54,22 @@ class nsPipeEvents; class nsPipeInputStream; class nsPipeOutputStream; +namespace { + +enum MonitorAction +{ + DoNotNotifyMonitor, + NotifyMonitor +}; + +enum SegmentChangeResult +{ + SegmentNotChanged, + SegmentDeleted +}; + +} // anonymous namespace + //----------------------------------------------------------------------------- // this class is used to delay notifications until the end of a particular @@ -160,10 +176,8 @@ public: // synchronously wait for the pipe to become readable. nsresult Wait(); - // these functions return true to indicate that the pipe's monitor should - // be notified, to wake up a blocked reader if any. - bool OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&); - bool OnInputException(nsresult, nsPipeEvents&); + MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&); + MonitorAction OnInputException(nsresult, nsPipeEvents&); nsPipeReadState& ReadState() { @@ -239,10 +253,8 @@ public: // synchronously wait for the pipe to become writable. nsresult Wait(); - // these functions return true to indicate that the pipe's monitor should - // be notified, to wake up a blocked writer if any. - bool OnOutputWritable(nsPipeEvents&); - bool OnOutputException(nsresult, nsPipeEvents&); + MonitorAction OnOutputWritable(nsPipeEvents&); + MonitorAction OnOutputException(nsresult, nsPipeEvents&); private: nsPipe* mPipe; @@ -292,6 +304,10 @@ public: const char*& aSegment, uint32_t& aSegmentLen); void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount, uint32_t* aAvailableOut); + SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState); + void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents, + uint32_t* aAvailableOut); + bool ReadSegmentBeingWritten(nsPipeReadState& aReadState); nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen); void AdvanceWriteCursor(uint32_t aCount); @@ -540,60 +556,110 @@ nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead, // if still writing in this segment then bail because we're not done // with the segment and have to wait for now... - if (mWriteSegment == aReadState.mSegment && mWriteLimit > mWriteCursor) { - NS_ASSERTION(aReadState.mReadLimit == mWriteCursor, "unexpected state"); + if (ReadSegmentBeingWritten(aReadState)) { return; } - uint32_t currentSegment = aReadState.mSegment; - - // Move to the next segment to read - aReadState.mSegment += 1; - - // If this was the last reference to the first segment, then remove it. - if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) { - - // shift write and read segment index (-1 indicates an empty buffer). - mWriteSegment -= 1; - - for (uint32_t i = 0; i < mInputList.Length(); ++i) { - mInputList[i]->ReadState().mSegment -= 1; - } - - // done with this segment - mBuffer.DeleteFirstSegment(); - LOG(("III deleting first segment\n")); - } - - if (mWriteSegment < aReadState.mSegment) { - // read cursor has hit the end of written data, so reset it - MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1)); - aReadState.mReadCursor = nullptr; - aReadState.mReadLimit = nullptr; - // also, the buffer is completely empty, so reset the write cursor - if (mWriteSegment == -1) { - mWriteCursor = nullptr; - mWriteLimit = nullptr; - } - } else { - // advance read cursor and limit to next buffer segment - aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment); - if (mWriteSegment == aReadState.mSegment) { - aReadState.mReadLimit = mWriteCursor; - } else { - aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize(); - } - } - - // we've free'd up a segment, so notify output stream that pipe has - // room for a new segment. - if (mOutput.OnOutputWritable(events)) { - mon.Notify(); + // Check to see if we can free up any segments. If we can, then notify + // the output stream that the pipe has room for a new segment. + if (AdvanceReadSegment(aReadState) == SegmentDeleted && + mOutput.OnOutputWritable(events) == NotifyMonitor) { + mon.NotifyAll(); } } } } +SegmentChangeResult +nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState) +{ + int32_t currentSegment = aReadState.mSegment; + + // Move to the next segment to read + aReadState.mSegment += 1; + + SegmentChangeResult result = SegmentNotChanged; + + // If this was the last reference to the first segment, then remove it. + if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) { + + // shift write and read segment index (-1 indicates an empty buffer). + mWriteSegment -= 1; + + for (uint32_t i = 0; i < mInputList.Length(); ++i) { + mInputList[i]->ReadState().mSegment -= 1; + } + + // done with this segment + mBuffer.DeleteFirstSegment(); + LOG(("III deleting first segment\n")); + + result = SegmentDeleted; + } + + if (mWriteSegment < aReadState.mSegment) { + // read cursor has hit the end of written data, so reset it + MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1)); + aReadState.mReadCursor = nullptr; + aReadState.mReadLimit = nullptr; + // also, the buffer is completely empty, so reset the write cursor + if (mWriteSegment == -1) { + mWriteCursor = nullptr; + mWriteLimit = nullptr; + } + } else { + // advance read cursor and limit to next buffer segment + aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment); + if (mWriteSegment == aReadState.mSegment) { + aReadState.mReadLimit = mWriteCursor; + } else { + aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize(); + } + } + + return result; +} + +void +nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents, + uint32_t* aAvailableOut) +{ + ReentrantMonitorAutoEnter mon(mReentrantMonitor); + + *aAvailableOut = 0; + + SegmentChangeResult result = SegmentNotChanged; + while(mWriteSegment >= aReadState.mSegment) { + + // If the last segment to free is still being written to, we're done + // draining. We can't free any more. + if (ReadSegmentBeingWritten(aReadState)) { + break; + } + + if (AdvanceReadSegment(aReadState) == SegmentDeleted) { + result = SegmentDeleted; + } + } + + // if we've free'd up a segment, notify output stream that pipe has + // room for a new segment. + if (result == SegmentDeleted && + mOutput.OnOutputWritable(aEvents) == NotifyMonitor) { + mon.NotifyAll(); + } +} + +bool +nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState) +{ + bool beingWritten = mWriteSegment == aReadState.mSegment && + mWriteLimit > mWriteCursor; + NS_ASSERTION(!beingWritten || aReadState.mReadLimit == mWriteCursor, + "unexpected state"); + return beingWritten; +} + nsresult nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen) { @@ -664,7 +730,7 @@ nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten) // notify input stream that pipe now contains additional data bool needNotify = false; for (uint32_t i = 0; i < mInputList.Length(); ++i) { - if (mInputList[i]->OnInputReadable(aBytesWritten, events)) { + if (mInputList[i]->OnInputReadable(aBytesWritten, events) == NotifyMonitor) { needNotify = true; } } @@ -705,12 +771,12 @@ nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason) continue; } - bool needNotify = mInputList[i]->OnInputException(aReason, events); + MonitorAction action = mInputList[i]->OnInputException(aReason, events); mInputList.RemoveElementAt(i); // Notify after element is removed in case we re-enter as a result. - if (needNotify) { - mon.Notify(); + if (action == NotifyMonitor) { + mon.NotifyAll(); } return; @@ -746,13 +812,13 @@ nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly) continue; } - if (mInputList[i]->OnInputException(aReason, events)) { + if (mInputList[i]->OnInputException(aReason, events) == NotifyMonitor) { needNotify = true; } } mInputList = tmpInputList; - if (mOutput.OnOutputException(aReason, events)) { + if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) { needNotify = true; } @@ -941,10 +1007,10 @@ nsPipeInputStream::Wait() return Status() == NS_BASE_STREAM_CLOSED ? NS_OK : Status(); } -bool +MonitorAction nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents) { - bool result = false; + MonitorAction result = DoNotNotifyMonitor; mAvailable += aBytesWritten; @@ -953,19 +1019,19 @@ nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten, nsPipeEvents& aEvents mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = true; + result = NotifyMonitor; } return result; } -bool +MonitorAction nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents) { LOG(("nsPipeInputStream::OnInputException [this=%x reason=%x]\n", this, aReason)); - bool result = false; + MonitorAction result = DoNotNotifyMonitor; NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception"); @@ -974,14 +1040,14 @@ nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents) } // force count of available bytes to zero. - mAvailable = 0; + mPipe->DrainInputStream(mReadState, aEvents, &mAvailable); if (mCallback) { aEvents.NotifyInputReady(this, mCallback); mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = true; + result = NotifyMonitor; } return result; @@ -1322,10 +1388,10 @@ nsPipeOutputStream::Wait() return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus; } -bool +MonitorAction nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) { - bool result = false; + MonitorAction result = DoNotNotifyMonitor; mWritable = true; @@ -1334,19 +1400,19 @@ nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents) mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = true; + result = NotifyMonitor; } return result; } -bool +MonitorAction nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents) { LOG(("nsPipeOutputStream::OnOutputException [this=%x reason=%x]\n", this, aReason)); - bool result = false; + MonitorAction result = DoNotNotifyMonitor; NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception"); mWritable = false; @@ -1356,7 +1422,7 @@ nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents) mCallback = 0; mCallbackFlags = 0; } else if (mBlocked) { - result = true; + result = NotifyMonitor; } return result;