diff --git a/netwerk/protocol/http/Http2Push.cpp b/netwerk/protocol/http/Http2Push.cpp index 7c05e39fef27..e17eeefb728c 100644 --- a/netwerk/protocol/http/Http2Push.cpp +++ b/netwerk/protocol/http/Http2Push.cpp @@ -166,7 +166,8 @@ Http2PushedStream::ReadSegments(nsAHttpSegmentReader *, // the write side of a pushed transaction just involves manipulating a little state SetSentFin(true); - Http2Stream::mAllHeadersSent = 1; + Http2Stream::mRequestHeadersDone = 1; + Http2Stream::mOpenGenerated = 1; Http2Stream::ChangeState(UPSTREAM_COMPLETE); *count = 0; return NS_OK; diff --git a/netwerk/protocol/http/Http2Session.cpp b/netwerk/protocol/http/Http2Session.cpp index ac5332cbf9ba..c06730db3b98 100644 --- a/netwerk/protocol/http/Http2Session.cpp +++ b/netwerk/protocol/http/Http2Session.cpp @@ -438,13 +438,15 @@ Http2Session::AddStream(nsAHttpTransaction *aHttpTransaction, mStreamTransactionHash.Put(aHttpTransaction, stream); - if (RoomForMoreConcurrent()) { - LOG3(("Http2Session::AddStream %p stream %p activated immediately.", - this, stream)); - ActivateStream(stream); - } else { - LOG3(("Http2Session::AddStream %p stream %p queued.", this, stream)); - mQueuedStreams.Push(stream); + mReadyForWrite.Push(stream); + SetWriteCallbacks(); + + // Kick off the SYN transmit without waiting for the poll loop + // This won't work for the first stream because there is no segment reader + // yet. + if (mSegmentReader) { + uint32_t countRead; + ReadSegments(nullptr, kDefaultBufferSize, &countRead); } if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) && @@ -458,32 +460,14 @@ Http2Session::AddStream(nsAHttpTransaction *aHttpTransaction, } void -Http2Session::ActivateStream(Http2Stream *stream) +Http2Session::QueueStream(Http2Stream *stream) { + // will be removed via processpending or a shutdown path MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), - "Do not activate pushed streams"); - MOZ_ASSERT(!stream->CountAsActive()); - stream->SetCountAsActive(true); - ++mConcurrent; - if (mConcurrent > mConcurrentHighWater) - mConcurrentHighWater = mConcurrent; - LOG3(("Http2Session::AddStream %p activating stream %p Currently %d " - "streams in session, high water mark is %d", - this, stream, mConcurrent, mConcurrentHighWater)); - - mReadyForWrite.Push(stream); - SetWriteCallbacks(); - - // Kick off the headers transmit without waiting for the poll loop - // This won't work for stream id=1 because there is no segment reader - // yet. - if (mSegmentReader) { - uint32_t countRead; - ReadSegments(nullptr, kDefaultBufferSize, &countRead); - } + LOG3(("Http2Session::QueueStream %p stream %p queued.", this, stream)); + mQueuedStreams.Push(stream); } void @@ -491,13 +475,15 @@ Http2Session::ProcessPending() { MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - while (RoomForMoreConcurrent()) { - Http2Stream *stream = static_cast(mQueuedStreams.PopFront()); - if (!stream) - return; - LOG3(("Http2Session::ProcessPending %p stream %p activated from queue.", + Http2Stream*stream; + while (RoomForMoreConcurrent() && + (stream = static_cast(mQueuedStreams.PopFront()))) { + + LOG3(("Http2Session::ProcessPending %p stream %p woken from queue.", this, stream)); - ActivateStream(stream); + MOZ_ASSERT(!stream->CountAsActive()); + mReadyForWrite.Push(stream); + SetWriteCallbacks(); } } @@ -616,6 +602,44 @@ Http2Session::ResetDownstreamState() mInputFrameDataStream = nullptr; } +// return true if activated (and counted against max) +// otherwise return false and queue +bool +Http2Session::TryToActivate(Http2Stream *aStream) +{ + if (!RoomForMoreConcurrent()) { + LOG3(("Http2Session::TryToActivate %p stream=%p no room for more concurrent " + "streams %d\n", this, aStream)); + QueueStream(aStream); + return false; + } + IncrementConcurrent(aStream); + return true; +} + +void +Http2Session::IncrementConcurrent(Http2Stream *stream) +{ + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); + MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), + "Do not activate pushed streams"); + + nsAHttpTransaction *trans = stream->Transaction(); + if (!trans || !trans->IsNullTransaction() || trans->QuerySpdyConnectTransaction()) { + + MOZ_ASSERT(!stream->CountAsActive()); + stream->SetCountAsActive(true); + ++mConcurrent; + + if (mConcurrent > mConcurrentHighWater) { + mConcurrentHighWater = mConcurrent; + } + LOG3(("Http2Session::IncrementCounter %p counting stream %p Currently %d " + "streams in session, high water mark is %d\n", + this, stream, mConcurrent, mConcurrentHighWater)); + } +} + // call with data length (i.e. 0 for 0 data bytes - ignore 9 byte header) // dest must have 9 bytes of allocated space template void @@ -656,6 +680,7 @@ Http2Session::CreateFrameHeader(uint8_t *dest, uint16_t frameLength, void Http2Session::MaybeDecrementConcurrent(Http2Stream *aStream) { + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); LOG3(("MaybeDecrementConcurrent %p id=0x%X concurrent=%d active=%d\n", this, aStream->StreamID(), mConcurrent, aStream->CountAsActive())); @@ -1450,6 +1475,7 @@ Http2Session::RecvSettings(Http2Session *self) case SETTINGS_TYPE_MAX_CONCURRENT: self->mMaxConcurrent = value; Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value); + self->ProcessPending(); break; case SETTINGS_TYPE_INITIAL_WINDOW: diff --git a/netwerk/protocol/http/Http2Session.h b/netwerk/protocol/http/Http2Session.h index 1f3433cf12e7..655f0abe0a83 100644 --- a/netwerk/protocol/http/Http2Session.h +++ b/netwerk/protocol/http/Http2Session.h @@ -205,8 +205,8 @@ public: uint32_t GetServerInitialStreamWindow() { return mServerInitialStreamWindow; } + bool TryToActivate(Http2Stream *stream); void ConnectPushedStream(Http2Stream *stream); - void MaybeDecrementConcurrent(Http2Stream *stream); nsresult ConfirmTLSProfile(); static bool ALPNCallback(nsISupports *securityInfo); @@ -266,8 +266,6 @@ private: void SetWriteCallbacks(); void RealignOutputQueue(); - bool RoomForMoreConcurrent(); - void ActivateStream(Http2Stream *); void ProcessPending(); nsresult SetInputFrameDataStream(uint32_t); void CreatePriorityNode(uint32_t, uint32_t, uint8_t, const char *); @@ -278,6 +276,11 @@ private: void UpdateLocalStreamWindow(Http2Stream *stream, uint32_t bytes); void UpdateLocalSessionWindow(uint32_t bytes); + void MaybeDecrementConcurrent(Http2Stream *stream); + bool RoomForMoreConcurrent(); + void IncrementConcurrent(Http2Stream *stream); + void QueueStream(Http2Stream *stream); + // a wrapper for all calls to the nshttpconnection level segment writer. Used // to track network I/O for timeout purposes nsresult NetworkRead(nsAHttpSegmentWriter *, char *, uint32_t, uint32_t *); diff --git a/netwerk/protocol/http/Http2Stream.cpp b/netwerk/protocol/http/Http2Stream.cpp index 0de1dc866b87..2497263d273d 100644 --- a/netwerk/protocol/http/Http2Stream.cpp +++ b/netwerk/protocol/http/Http2Stream.cpp @@ -46,7 +46,8 @@ Http2Stream::Http2Stream(nsAHttpTransaction *httpTransaction, , mSession(session) , mUpstreamState(GENERATING_HEADERS) , mState(IDLE) - , mAllHeadersSent(0) + , mRequestHeadersDone(0) + , mOpenGenerated(0) , mAllHeadersReceived(0) , mTransaction(httpTransaction) , mSocketTransport(session->SocketTransport()) @@ -154,7 +155,7 @@ Http2Stream::ReadSegments(nsAHttpSegmentReader *reader, // If not, mark the stream for callback when writing can proceed. if (NS_SUCCEEDED(rv) && mUpstreamState == GENERATING_HEADERS && - !mAllHeadersSent) + !mRequestHeadersDone) mSession->TransactionHasDataToWrite(this); // mTxinlineFrameUsed represents any queued un-sent frame. It might @@ -303,16 +304,17 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf, uint32_t *countUsed) { // Returns NS_OK even if the headers are incomplete - // set mAllHeadersSent flag if they are complete + // set mRequestHeadersDone flag if they are complete MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(mUpstreamState == GENERATING_HEADERS); + MOZ_ASSERT(!mRequestHeadersDone); LOG3(("Http2Stream::ParseHttpRequestHeaders %p avail=%d state=%x", this, avail, mUpstreamState)); mFlatHttpRequestHeaders.Append(buf, avail); - nsHttpRequestHead *head = mTransaction->RequestHead(); + const nsHttpRequestHead *head = mTransaction->RequestHead(); // We can use the simple double crlf because firefox is the // only client we are parsing @@ -333,7 +335,7 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf, uint32_t oldLen = mFlatHttpRequestHeaders.Length(); mFlatHttpRequestHeaders.SetLength(endHeader + 2); *countUsed = avail - (oldLen - endHeader) + 4; - mAllHeadersSent = 1; + mRequestHeadersDone = 1; nsAutoCString authorityHeader; nsAutoCString hashkey; @@ -388,28 +390,32 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf, SetSentFin(true); AdjustPushedPriority(); - // This stream has been activated (and thus counts against the concurrency - // limit intentionally), but will not be registered via - // RegisterStreamID (below) because of the push match. - // Release that semaphore count immediately (instead of waiting for - // cleanup stream) so we can initiate more pull streams. - mSession->MaybeDecrementConcurrent(this); - // There is probably pushed data buffered so trigger a read manually // as we can't rely on future network events to do it mSession->ConnectPushedStream(this); + mOpenGenerated = 1; return NS_OK; } } + return NS_OK; +} +// This is really a headers frame, but open is pretty clear from a workflow pov +nsresult +Http2Stream::GenerateOpen() +{ // It is now OK to assign a streamID that we are assured will // be monotonically increasing amongst new streams on this // session mStreamID = mSession->RegisterStreamID(this); MOZ_ASSERT(mStreamID & 1, "Http2 Stream Channel ID must be odd"); - LOG3(("Stream ID 0x%X [session=%p] for URI %s\n", - mStreamID, mSession, - nsCString(head->RequestURI()).get())); + MOZ_ASSERT(!mOpenGenerated); + + mOpenGenerated = 1; + + const nsHttpRequestHead *head = mTransaction->RequestHead(); + LOG3(("Http2Stream %p Stream ID 0x%X [session=%p] for URI %s\n", + this, mStreamID, mSession, nsCString(head->RequestURI()).get())); if (mStreamID >= 0x80000000) { // streamID must fit in 31 bits. Evading This is theoretically possible @@ -428,6 +434,9 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf, // of HTTP/2 headers by writing to mTxInlineFrame{sz} nsCString compressedData; + nsAutoCString authorityHeader; + head->GetHeader(nsHttp::Host, authorityHeader); + nsDependentCString scheme(head->IsHTTPS() ? "https" : "http"); if (head->IsConnect()) { MOZ_ASSERT(mTransaction->QuerySpdyConnectTransaction()); @@ -440,6 +449,7 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf, if (!ci) { return NS_ERROR_UNEXPECTED; } + authorityHeader = ci->GetHost(); authorityHeader.Append(':'); authorityHeader.AppendInt(ci->Port()); @@ -1200,12 +1210,26 @@ Http2Stream::OnReadSegment(const char *buf, // the number of those bytes that we consume (i.e. the portion that are // header bytes) - rv = ParseHttpRequestHeaders(buf, count, countRead); - if (NS_FAILED(rv)) - return rv; - LOG3(("ParseHttpRequestHeaders %p used %d of %d. complete = %d", - this, *countRead, count, mAllHeadersSent)); - if (mAllHeadersSent) { + if (!mRequestHeadersDone) { + if (NS_FAILED(rv = ParseHttpRequestHeaders(buf, count, countRead))) { + return rv; + } + } + + if (mRequestHeadersDone && !mOpenGenerated) { + if (!mSession->TryToActivate(this)) { + LOG3(("Http2Stream::OnReadSegment %p cannot activate now. queued.\n", this)); + return NS_OK; + } + if (NS_FAILED(rv = GenerateOpen())) { + return rv; + } + } + + LOG3(("ParseHttpRequestHeaders %p used %d of %d. " + "requestheadersdone = %d mOpenGenerated = %d\n", + this, *countRead, count, mRequestHeadersDone, mOpenGenerated)); + if (mOpenGenerated) { SetHTTPState(OPEN); AdjustInitialWindow(); // This version of TransmitFrame cannot block diff --git a/netwerk/protocol/http/Http2Stream.h b/netwerk/protocol/http/Http2Stream.h index 172fe59947b8..1828cb4a69a4 100644 --- a/netwerk/protocol/http/Http2Stream.h +++ b/netwerk/protocol/http/Http2Stream.h @@ -178,10 +178,13 @@ protected: // The HTTP/2 state for the stream from section 5.1 enum stateType mState; - // Flag is set when all http request headers have been read and ID is stable - uint32_t mAllHeadersSent : 1; + // Flag is set when all http request headers have been read ID is not stable + uint32_t mRequestHeadersDone : 1; - // Flag is set when all http request headers have been read and ID is stable + // Flag is set when ID is stable and concurrency limits are met + uint32_t mOpenGenerated : 1; + + // Flag is set when all http response headers have been read uint32_t mAllHeadersReceived : 1; void ChangeState(enum upstreamStateType); @@ -190,6 +193,8 @@ private: friend class nsAutoPtr; nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *); + nsresult GenerateOpen(); + void AdjustPushedPriority(); void AdjustInitialWindow(); nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment); diff --git a/netwerk/protocol/http/SpdyPush31.cpp b/netwerk/protocol/http/SpdyPush31.cpp index 6c6869c31197..8fb7d795b594 100644 --- a/netwerk/protocol/http/SpdyPush31.cpp +++ b/netwerk/protocol/http/SpdyPush31.cpp @@ -106,7 +106,8 @@ SpdyPushedStream31::ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *co // the write side of a pushed transaction just involves manipulating a little state SpdyStream31::mSentFinOnData = 1; - SpdyStream31::mSynFrameComplete = 1; + SpdyStream31::mRequestHeadersDone = 1; + SpdyStream31::mSynFrameGenerated = 1; SpdyStream31::ChangeState(UPSTREAM_COMPLETE); *count = 0; return NS_OK; diff --git a/netwerk/protocol/http/SpdySession31.cpp b/netwerk/protocol/http/SpdySession31.cpp index 0647e02723b8..c4a4efd18ea2 100644 --- a/netwerk/protocol/http/SpdySession31.cpp +++ b/netwerk/protocol/http/SpdySession31.cpp @@ -379,14 +379,15 @@ SpdySession31::AddStream(nsAHttpTransaction *aHttpTransaction, mStreamTransactionHash.Put(aHttpTransaction, stream); - if (RoomForMoreConcurrent()) { - LOG3(("SpdySession31::AddStream %p stream %p activated immediately.", - this, stream)); - ActivateStream(stream); - } - else { - LOG3(("SpdySession31::AddStream %p stream %p queued.", this, stream)); - mQueuedStreams.Push(stream); + mReadyForWrite.Push(stream); + SetWriteCallbacks(); + + // Kick off the SYN transmit without waiting for the poll loop + // This won't work for stream id=1 because there is no segment reader + // yet. + if (mSegmentReader) { + uint32_t countRead; + ReadSegments(nullptr, kDefaultBufferSize, &countRead); } if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) && @@ -400,33 +401,14 @@ SpdySession31::AddStream(nsAHttpTransaction *aHttpTransaction, } void -SpdySession31::ActivateStream(SpdyStream31 *stream) +SpdySession31::QueueStream(SpdyStream31 *stream) { + // will be removed via processpending or a shutdown path MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), - "Do not activate pushed streams"); + MOZ_ASSERT(!stream->CountAsActive()); - nsAHttpTransaction *trans = stream->Transaction(); - if (!trans || !trans->IsNullTransaction() || trans->QuerySpdyConnectTransaction()) { - ++mConcurrent; - if (mConcurrent > mConcurrentHighWater) { - mConcurrentHighWater = mConcurrent; - } - LOG3(("SpdySession31::AddStream %p activating stream %p Currently %d " - "streams in session, high water mark is %d", - this, stream, mConcurrent, mConcurrentHighWater)); - } - - mReadyForWrite.Push(stream); - SetWriteCallbacks(); - - // Kick off the SYN transmit without waiting for the poll loop - // This won't work for stream id=1 because there is no segment reader - // yet. - if (mSegmentReader) { - uint32_t countRead; - ReadSegments(nullptr, kDefaultBufferSize, &countRead); - } + LOG3(("SpdySession31::QueueStream %p stream %p queued.", this, stream)); + mQueuedStreams.Push(stream); } void @@ -434,13 +416,15 @@ SpdySession31::ProcessPending() { MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - while (RoomForMoreConcurrent()) { - SpdyStream31 *stream = static_cast(mQueuedStreams.PopFront()); - if (!stream) - return; - LOG3(("SpdySession31::ProcessPending %p stream %p activated from queue.", + SpdyStream31 *stream; + while (RoomForMoreConcurrent() && + (stream = static_cast(mQueuedStreams.PopFront()))) { + + LOG3(("SpdySession31::ProcessPending %p stream %p woken from queue.", this, stream)); - ActivateStream(stream); + MOZ_ASSERT(!stream->CountAsActive()); + mReadyForWrite.Push(stream); + SetWriteCallbacks(); } } @@ -561,18 +545,60 @@ SpdySession31::ResetDownstreamState() mInputFrameDataStream = nullptr; } +// return true if activated (and counted against max) +// otherwise return false and queue +bool +SpdySession31::TryToActivate(SpdyStream31 *aStream) +{ + if (!RoomForMoreConcurrent()) { + LOG3(("SpdySession31::TryToActivate %p stream=%p no room for more concurrent " + "streams %d\n", this, aStream)); + QueueStream(aStream); + return false; + } + IncrementConcurrent(aStream); + return true; +} + +void +SpdySession31::IncrementConcurrent(SpdyStream31 *stream) +{ + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); + MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), + "Do not activate pushed streams"); + + nsAHttpTransaction *trans = stream->Transaction(); + if (!trans || !trans->IsNullTransaction() || trans->QuerySpdyConnectTransaction()) { + + MOZ_ASSERT(!stream->CountAsActive()); + stream->SetCountAsActive(true); + ++mConcurrent; + + if (mConcurrent > mConcurrentHighWater) { + mConcurrentHighWater = mConcurrent; + } + LOG3(("SpdySession31::AddStream %p counting stream %p Currently %d " + "streams in session, high water mark is %d", + this, stream, mConcurrent, mConcurrentHighWater)); + } +} + void SpdySession31::DecrementConcurrent(SpdyStream31 *aStream) { - uint32_t id = aStream->StreamID(); + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - if (id && !(id & 0x1)) - return; // pushed streams aren't counted in concurrent limit + if (!aStream->CountAsActive()) { + return; + } MOZ_ASSERT(mConcurrent); + aStream->SetCountAsActive(false); --mConcurrent; + LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n", - this, id, mConcurrent)); + this, aStream->StreamID(), mConcurrent)); + ProcessPending(); } @@ -1414,6 +1440,7 @@ SpdySession31::HandleSettings(SpdySession31 *self) case SETTINGS_TYPE_MAX_CONCURRENT: self->mMaxConcurrent = value; Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value); + self->ProcessPending(); break; case SETTINGS_TYPE_CWND: diff --git a/netwerk/protocol/http/SpdySession31.h b/netwerk/protocol/http/SpdySession31.h index 6192ca019881..595b06f4b96c 100644 --- a/netwerk/protocol/http/SpdySession31.h +++ b/netwerk/protocol/http/SpdySession31.h @@ -178,6 +178,7 @@ public: uint32_t GetServerInitialStreamWindow() { return mServerInitialStreamWindow; } + bool TryToActivate(SpdyStream31 *stream); void ConnectPushedStream(SpdyStream31 *stream); void DecrementConcurrent(SpdyStream31 *stream); @@ -223,8 +224,6 @@ private: void SetWriteCallbacks(); void RealignOutputQueue(); - bool RoomForMoreConcurrent(); - void ActivateStream(SpdyStream31 *); void ProcessPending(); nsresult SetInputFrameDataStream(uint32_t); bool VerifyStream(SpdyStream31 *, uint32_t); @@ -234,6 +233,10 @@ private: void UpdateLocalStreamWindow(SpdyStream31 *stream, uint32_t bytes); void UpdateLocalSessionWindow(uint32_t bytes); + bool RoomForMoreConcurrent(); + void IncrementConcurrent(SpdyStream31 *stream); + void QueueStream(SpdyStream31 *stream); + // a wrapper for all calls to the nshttpconnection level segment writer. Used // to track network I/O for timeout purposes nsresult NetworkRead(nsAHttpSegmentWriter *, char *, uint32_t, uint32_t *); diff --git a/netwerk/protocol/http/SpdyStream31.cpp b/netwerk/protocol/http/SpdyStream31.cpp index 068720575ae6..42242853c50f 100644 --- a/netwerk/protocol/http/SpdyStream31.cpp +++ b/netwerk/protocol/http/SpdyStream31.cpp @@ -42,7 +42,8 @@ SpdyStream31::SpdyStream31(nsAHttpTransaction *httpTransaction, : mStreamID(0) , mSession(spdySession) , mUpstreamState(GENERATING_SYN_STREAM) - , mSynFrameComplete(0) + , mRequestHeadersDone(0) + , mSynFrameGenerated(0) , mSentFinOnData(0) , mTransaction(httpTransaction) , mSocketTransport(spdySession->SocketTransport()) @@ -55,6 +56,7 @@ SpdyStream31::SpdyStream31(nsAHttpTransaction *httpTransaction, , mSentWaitingFor(0) , mReceivedData(0) , mSetTCPSocketBuffer(0) + , mCountAsActive(0) , mTxInlineFrameSize(SpdySession31::kDefaultBufferSize) , mTxInlineFrameUsed(0) , mTxStreamFrameSize(0) @@ -129,7 +131,7 @@ SpdyStream31::ReadSegments(nsAHttpSegmentReader *reader, // If not, mark the stream for callback when writing can proceed. if (NS_SUCCEEDED(rv) && mUpstreamState == GENERATING_SYN_STREAM && - !mSynFrameComplete) + !mRequestHeadersDone) mSession->TransactionHasDataToWrite(this); // mTxinlineFrameUsed represents any queued un-sent frame. It might @@ -259,10 +261,11 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf, uint32_t *countUsed) { // Returns NS_OK even if the headers are incomplete - // set mSynFrameComplete flag if they are complete + // set mRequestHeadersDone flag if they are complete MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(mUpstreamState == GENERATING_SYN_STREAM); + MOZ_ASSERT(!mRequestHeadersDone); LOG3(("SpdyStream31::ParseHttpRequestHeaders %p avail=%d state=%x", this, avail, mUpstreamState)); @@ -288,7 +291,7 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf, uint32_t oldLen = mFlatHttpRequestHeaders.Length(); mFlatHttpRequestHeaders.SetLength(endHeader + 2); *countUsed = avail - (oldLen - endHeader) + 4; - mSynFrameComplete = 1; + mRequestHeadersDone = 1; nsAutoCString hostHeader; nsAutoCString hashkey; @@ -330,15 +333,24 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf, // There is probably pushed data buffered so trigger a read manually // as we can't rely on future network events to do it mSession->ConnectPushedStream(this); + mSynFrameGenerated = 1; return NS_OK; } } + return NS_OK; +} +nsresult +SpdyStream31::GenerateSynFrame() +{ // It is now OK to assign a streamID that we are assured will // be monotonically increasing amongst syn-streams on this // session mStreamID = mSession->RegisterStreamID(this); MOZ_ASSERT(mStreamID & 1, "Spdy Stream Channel ID must be odd"); + MOZ_ASSERT(!mSynFrameGenerated); + + mSynFrameGenerated = 1; if (mStreamID >= 0x80000000) { // streamID must fit in 31 bits. This is theoretically possible @@ -507,6 +519,8 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf, CompressToFrame(NS_LITERAL_CSTRING(":version")); CompressToFrame(versionHeader); + nsAutoCString hostHeader; + mTransaction->RequestHead()->GetHeader(nsHttp::Host, hostHeader); CompressToFrame(NS_LITERAL_CSTRING(":host")); CompressToFrame(hostHeader); @@ -1455,12 +1469,26 @@ SpdyStream31::OnReadSegment(const char *buf, // the number of those bytes that we consume (i.e. the portion that are // header bytes) - rv = ParseHttpRequestHeaders(buf, count, countRead); - if (NS_FAILED(rv)) - return rv; - LOG3(("ParseHttpRequestHeaders %p used %d of %d. complete = %d", - this, *countRead, count, mSynFrameComplete)); - if (mSynFrameComplete) { + if (!mRequestHeadersDone) { + if (NS_FAILED(rv = ParseHttpRequestHeaders(buf, count, countRead))) { + return rv; + } + } + + if (mRequestHeadersDone && !mSynFrameGenerated) { + if (!mSession->TryToActivate(this)) { + LOG3(("SpdyStream31::OnReadSegment %p cannot activate now. queued.\n", this)); + return NS_OK; + } + if (NS_FAILED(rv = GenerateSynFrame())) { + return rv; + } + } + + LOG3(("ParseHttpRequestHeaders %p used %d of %d. " + "requestheadersdone = %d mSynFrameGenerated = %d\n", + this, *countRead, count, mRequestHeadersDone, mSynFrameGenerated)); + if (mSynFrameGenerated) { AdjustInitialWindow(); rv = TransmitFrame(nullptr, nullptr, true); if (rv == NS_BASE_STREAM_WOULD_BLOCK) { diff --git a/netwerk/protocol/http/SpdyStream31.h b/netwerk/protocol/http/SpdyStream31.h index 77d12c8b2222..5fc584be8868 100644 --- a/netwerk/protocol/http/SpdyStream31.h +++ b/netwerk/protocol/http/SpdyStream31.h @@ -55,6 +55,9 @@ public: void SetRecvdData(bool aStatus) { mReceivedData = aStatus ? 1 : 0; } bool RecvdData() { return mReceivedData; } + void SetCountAsActive(bool aStatus) { mCountAsActive = aStatus ? 1 : 0; } + bool CountAsActive() { return mCountAsActive; } + void UpdateTransportSendEvents(uint32_t count); void UpdateTransportReadEvents(uint32_t count); @@ -118,8 +121,11 @@ protected: // sending_request_body for each SPDY chunk in the upload. enum stateType mUpstreamState; - // Flag is set when all http request headers have been read and ID is stable - uint32_t mSynFrameComplete : 1; + // Flag is set when all http request headers have been read + uint32_t mRequestHeadersDone : 1; + + // Flag is set when stream ID is stable + uint32_t mSynFrameGenerated : 1; // Flag is set when a FIN has been placed on a data or syn packet // (i.e after the client has closed) @@ -135,6 +141,8 @@ private: void *); nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *); + nsresult GenerateSynFrame(); + void AdjustInitialWindow(); nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment); void GenerateDataFrameHeader(uint32_t, bool); @@ -185,6 +193,9 @@ private: // Flag is set after TCP send autotuning has been disabled uint32_t mSetTCPSocketBuffer : 1; + // Flag is set when stream is counted towards MAX_CONCURRENT streams in session + uint32_t mCountAsActive : 1; + // The InlineFrame and associated data is used for composing control // frames and data frame headers. nsAutoArrayPtr mTxInlineFrame;