bug 1072478 - h2 push hit not subject to max_concurrent 2/2 r=hurley

This commit is contained in:
Patrick McManus 2014-10-15 11:51:25 -04:00
Родитель f4c9705ef9
Коммит e7a182ce97
10 изменённых файлов: 249 добавлений и 120 удалений

Просмотреть файл

@ -166,7 +166,8 @@ Http2PushedStream::ReadSegments(nsAHttpSegmentReader *,
// the write side of a pushed transaction just involves manipulating a little state // the write side of a pushed transaction just involves manipulating a little state
SetSentFin(true); SetSentFin(true);
Http2Stream::mAllHeadersSent = 1; Http2Stream::mRequestHeadersDone = 1;
Http2Stream::mOpenGenerated = 1;
Http2Stream::ChangeState(UPSTREAM_COMPLETE); Http2Stream::ChangeState(UPSTREAM_COMPLETE);
*count = 0; *count = 0;
return NS_OK; return NS_OK;

Просмотреть файл

@ -438,13 +438,15 @@ Http2Session::AddStream(nsAHttpTransaction *aHttpTransaction,
mStreamTransactionHash.Put(aHttpTransaction, stream); mStreamTransactionHash.Put(aHttpTransaction, stream);
if (RoomForMoreConcurrent()) { mReadyForWrite.Push(stream);
LOG3(("Http2Session::AddStream %p stream %p activated immediately.", SetWriteCallbacks();
this, stream));
ActivateStream(stream); // Kick off the SYN transmit without waiting for the poll loop
} else { // This won't work for the first stream because there is no segment reader
LOG3(("Http2Session::AddStream %p stream %p queued.", this, stream)); // yet.
mQueuedStreams.Push(stream); if (mSegmentReader) {
uint32_t countRead;
ReadSegments(nullptr, kDefaultBufferSize, &countRead);
} }
if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) && if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) &&
@ -458,32 +460,14 @@ Http2Session::AddStream(nsAHttpTransaction *aHttpTransaction,
} }
void void
Http2Session::ActivateStream(Http2Stream *stream) Http2Session::QueueStream(Http2Stream *stream)
{ {
// will be removed via processpending or a shutdown path
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
"Do not activate pushed streams");
MOZ_ASSERT(!stream->CountAsActive()); MOZ_ASSERT(!stream->CountAsActive());
stream->SetCountAsActive(true);
++mConcurrent;
if (mConcurrent > mConcurrentHighWater) LOG3(("Http2Session::QueueStream %p stream %p queued.", this, stream));
mConcurrentHighWater = mConcurrent; mQueuedStreams.Push(stream);
LOG3(("Http2Session::AddStream %p activating stream %p Currently %d "
"streams in session, high water mark is %d",
this, stream, mConcurrent, mConcurrentHighWater));
mReadyForWrite.Push(stream);
SetWriteCallbacks();
// Kick off the headers transmit without waiting for the poll loop
// This won't work for stream id=1 because there is no segment reader
// yet.
if (mSegmentReader) {
uint32_t countRead;
ReadSegments(nullptr, kDefaultBufferSize, &countRead);
}
} }
void void
@ -491,13 +475,15 @@ Http2Session::ProcessPending()
{ {
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
while (RoomForMoreConcurrent()) { Http2Stream*stream;
Http2Stream *stream = static_cast<Http2Stream *>(mQueuedStreams.PopFront()); while (RoomForMoreConcurrent() &&
if (!stream) (stream = static_cast<Http2Stream *>(mQueuedStreams.PopFront()))) {
return;
LOG3(("Http2Session::ProcessPending %p stream %p activated from queue.", LOG3(("Http2Session::ProcessPending %p stream %p woken from queue.",
this, stream)); this, stream));
ActivateStream(stream); MOZ_ASSERT(!stream->CountAsActive());
mReadyForWrite.Push(stream);
SetWriteCallbacks();
} }
} }
@ -616,6 +602,44 @@ Http2Session::ResetDownstreamState()
mInputFrameDataStream = nullptr; mInputFrameDataStream = nullptr;
} }
// return true if activated (and counted against max)
// otherwise return false and queue
bool
Http2Session::TryToActivate(Http2Stream *aStream)
{
if (!RoomForMoreConcurrent()) {
LOG3(("Http2Session::TryToActivate %p stream=%p no room for more concurrent "
"streams %d\n", this, aStream));
QueueStream(aStream);
return false;
}
IncrementConcurrent(aStream);
return true;
}
void
Http2Session::IncrementConcurrent(Http2Stream *stream)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
"Do not activate pushed streams");
nsAHttpTransaction *trans = stream->Transaction();
if (!trans || !trans->IsNullTransaction() || trans->QuerySpdyConnectTransaction()) {
MOZ_ASSERT(!stream->CountAsActive());
stream->SetCountAsActive(true);
++mConcurrent;
if (mConcurrent > mConcurrentHighWater) {
mConcurrentHighWater = mConcurrent;
}
LOG3(("Http2Session::IncrementCounter %p counting stream %p Currently %d "
"streams in session, high water mark is %d\n",
this, stream, mConcurrent, mConcurrentHighWater));
}
}
// call with data length (i.e. 0 for 0 data bytes - ignore 9 byte header) // call with data length (i.e. 0 for 0 data bytes - ignore 9 byte header)
// dest must have 9 bytes of allocated space // dest must have 9 bytes of allocated space
template<typename charType> void template<typename charType> void
@ -656,6 +680,7 @@ Http2Session::CreateFrameHeader(uint8_t *dest, uint16_t frameLength,
void void
Http2Session::MaybeDecrementConcurrent(Http2Stream *aStream) Http2Session::MaybeDecrementConcurrent(Http2Stream *aStream)
{ {
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
LOG3(("MaybeDecrementConcurrent %p id=0x%X concurrent=%d active=%d\n", LOG3(("MaybeDecrementConcurrent %p id=0x%X concurrent=%d active=%d\n",
this, aStream->StreamID(), mConcurrent, aStream->CountAsActive())); this, aStream->StreamID(), mConcurrent, aStream->CountAsActive()));
@ -1450,6 +1475,7 @@ Http2Session::RecvSettings(Http2Session *self)
case SETTINGS_TYPE_MAX_CONCURRENT: case SETTINGS_TYPE_MAX_CONCURRENT:
self->mMaxConcurrent = value; self->mMaxConcurrent = value;
Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value); Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value);
self->ProcessPending();
break; break;
case SETTINGS_TYPE_INITIAL_WINDOW: case SETTINGS_TYPE_INITIAL_WINDOW:

Просмотреть файл

@ -205,8 +205,8 @@ public:
uint32_t GetServerInitialStreamWindow() { return mServerInitialStreamWindow; } uint32_t GetServerInitialStreamWindow() { return mServerInitialStreamWindow; }
bool TryToActivate(Http2Stream *stream);
void ConnectPushedStream(Http2Stream *stream); void ConnectPushedStream(Http2Stream *stream);
void MaybeDecrementConcurrent(Http2Stream *stream);
nsresult ConfirmTLSProfile(); nsresult ConfirmTLSProfile();
static bool ALPNCallback(nsISupports *securityInfo); static bool ALPNCallback(nsISupports *securityInfo);
@ -266,8 +266,6 @@ private:
void SetWriteCallbacks(); void SetWriteCallbacks();
void RealignOutputQueue(); void RealignOutputQueue();
bool RoomForMoreConcurrent();
void ActivateStream(Http2Stream *);
void ProcessPending(); void ProcessPending();
nsresult SetInputFrameDataStream(uint32_t); nsresult SetInputFrameDataStream(uint32_t);
void CreatePriorityNode(uint32_t, uint32_t, uint8_t, const char *); void CreatePriorityNode(uint32_t, uint32_t, uint8_t, const char *);
@ -278,6 +276,11 @@ private:
void UpdateLocalStreamWindow(Http2Stream *stream, uint32_t bytes); void UpdateLocalStreamWindow(Http2Stream *stream, uint32_t bytes);
void UpdateLocalSessionWindow(uint32_t bytes); void UpdateLocalSessionWindow(uint32_t bytes);
void MaybeDecrementConcurrent(Http2Stream *stream);
bool RoomForMoreConcurrent();
void IncrementConcurrent(Http2Stream *stream);
void QueueStream(Http2Stream *stream);
// a wrapper for all calls to the nshttpconnection level segment writer. Used // a wrapper for all calls to the nshttpconnection level segment writer. Used
// to track network I/O for timeout purposes // to track network I/O for timeout purposes
nsresult NetworkRead(nsAHttpSegmentWriter *, char *, uint32_t, uint32_t *); nsresult NetworkRead(nsAHttpSegmentWriter *, char *, uint32_t, uint32_t *);

Просмотреть файл

@ -46,7 +46,8 @@ Http2Stream::Http2Stream(nsAHttpTransaction *httpTransaction,
, mSession(session) , mSession(session)
, mUpstreamState(GENERATING_HEADERS) , mUpstreamState(GENERATING_HEADERS)
, mState(IDLE) , mState(IDLE)
, mAllHeadersSent(0) , mRequestHeadersDone(0)
, mOpenGenerated(0)
, mAllHeadersReceived(0) , mAllHeadersReceived(0)
, mTransaction(httpTransaction) , mTransaction(httpTransaction)
, mSocketTransport(session->SocketTransport()) , mSocketTransport(session->SocketTransport())
@ -154,7 +155,7 @@ Http2Stream::ReadSegments(nsAHttpSegmentReader *reader,
// If not, mark the stream for callback when writing can proceed. // If not, mark the stream for callback when writing can proceed.
if (NS_SUCCEEDED(rv) && if (NS_SUCCEEDED(rv) &&
mUpstreamState == GENERATING_HEADERS && mUpstreamState == GENERATING_HEADERS &&
!mAllHeadersSent) !mRequestHeadersDone)
mSession->TransactionHasDataToWrite(this); mSession->TransactionHasDataToWrite(this);
// mTxinlineFrameUsed represents any queued un-sent frame. It might // mTxinlineFrameUsed represents any queued un-sent frame. It might
@ -303,16 +304,17 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf,
uint32_t *countUsed) uint32_t *countUsed)
{ {
// Returns NS_OK even if the headers are incomplete // Returns NS_OK even if the headers are incomplete
// set mAllHeadersSent flag if they are complete // set mRequestHeadersDone flag if they are complete
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mUpstreamState == GENERATING_HEADERS); MOZ_ASSERT(mUpstreamState == GENERATING_HEADERS);
MOZ_ASSERT(!mRequestHeadersDone);
LOG3(("Http2Stream::ParseHttpRequestHeaders %p avail=%d state=%x", LOG3(("Http2Stream::ParseHttpRequestHeaders %p avail=%d state=%x",
this, avail, mUpstreamState)); this, avail, mUpstreamState));
mFlatHttpRequestHeaders.Append(buf, avail); mFlatHttpRequestHeaders.Append(buf, avail);
nsHttpRequestHead *head = mTransaction->RequestHead(); const nsHttpRequestHead *head = mTransaction->RequestHead();
// We can use the simple double crlf because firefox is the // We can use the simple double crlf because firefox is the
// only client we are parsing // only client we are parsing
@ -333,7 +335,7 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf,
uint32_t oldLen = mFlatHttpRequestHeaders.Length(); uint32_t oldLen = mFlatHttpRequestHeaders.Length();
mFlatHttpRequestHeaders.SetLength(endHeader + 2); mFlatHttpRequestHeaders.SetLength(endHeader + 2);
*countUsed = avail - (oldLen - endHeader) + 4; *countUsed = avail - (oldLen - endHeader) + 4;
mAllHeadersSent = 1; mRequestHeadersDone = 1;
nsAutoCString authorityHeader; nsAutoCString authorityHeader;
nsAutoCString hashkey; nsAutoCString hashkey;
@ -388,28 +390,32 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf,
SetSentFin(true); SetSentFin(true);
AdjustPushedPriority(); AdjustPushedPriority();
// This stream has been activated (and thus counts against the concurrency
// limit intentionally), but will not be registered via
// RegisterStreamID (below) because of the push match.
// Release that semaphore count immediately (instead of waiting for
// cleanup stream) so we can initiate more pull streams.
mSession->MaybeDecrementConcurrent(this);
// There is probably pushed data buffered so trigger a read manually // There is probably pushed data buffered so trigger a read manually
// as we can't rely on future network events to do it // as we can't rely on future network events to do it
mSession->ConnectPushedStream(this); mSession->ConnectPushedStream(this);
mOpenGenerated = 1;
return NS_OK; return NS_OK;
} }
} }
return NS_OK;
}
// This is really a headers frame, but open is pretty clear from a workflow pov
nsresult
Http2Stream::GenerateOpen()
{
// It is now OK to assign a streamID that we are assured will // It is now OK to assign a streamID that we are assured will
// be monotonically increasing amongst new streams on this // be monotonically increasing amongst new streams on this
// session // session
mStreamID = mSession->RegisterStreamID(this); mStreamID = mSession->RegisterStreamID(this);
MOZ_ASSERT(mStreamID & 1, "Http2 Stream Channel ID must be odd"); MOZ_ASSERT(mStreamID & 1, "Http2 Stream Channel ID must be odd");
LOG3(("Stream ID 0x%X [session=%p] for URI %s\n", MOZ_ASSERT(!mOpenGenerated);
mStreamID, mSession,
nsCString(head->RequestURI()).get())); mOpenGenerated = 1;
const nsHttpRequestHead *head = mTransaction->RequestHead();
LOG3(("Http2Stream %p Stream ID 0x%X [session=%p] for URI %s\n",
this, mStreamID, mSession, nsCString(head->RequestURI()).get()));
if (mStreamID >= 0x80000000) { if (mStreamID >= 0x80000000) {
// streamID must fit in 31 bits. Evading This is theoretically possible // streamID must fit in 31 bits. Evading This is theoretically possible
@ -428,6 +434,9 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf,
// of HTTP/2 headers by writing to mTxInlineFrame{sz} // of HTTP/2 headers by writing to mTxInlineFrame{sz}
nsCString compressedData; nsCString compressedData;
nsAutoCString authorityHeader;
head->GetHeader(nsHttp::Host, authorityHeader);
nsDependentCString scheme(head->IsHTTPS() ? "https" : "http"); nsDependentCString scheme(head->IsHTTPS() ? "https" : "http");
if (head->IsConnect()) { if (head->IsConnect()) {
MOZ_ASSERT(mTransaction->QuerySpdyConnectTransaction()); MOZ_ASSERT(mTransaction->QuerySpdyConnectTransaction());
@ -440,6 +449,7 @@ Http2Stream::ParseHttpRequestHeaders(const char *buf,
if (!ci) { if (!ci) {
return NS_ERROR_UNEXPECTED; return NS_ERROR_UNEXPECTED;
} }
authorityHeader = ci->GetHost(); authorityHeader = ci->GetHost();
authorityHeader.Append(':'); authorityHeader.Append(':');
authorityHeader.AppendInt(ci->Port()); authorityHeader.AppendInt(ci->Port());
@ -1200,12 +1210,26 @@ Http2Stream::OnReadSegment(const char *buf,
// the number of those bytes that we consume (i.e. the portion that are // the number of those bytes that we consume (i.e. the portion that are
// header bytes) // header bytes)
rv = ParseHttpRequestHeaders(buf, count, countRead); if (!mRequestHeadersDone) {
if (NS_FAILED(rv)) if (NS_FAILED(rv = ParseHttpRequestHeaders(buf, count, countRead))) {
return rv; return rv;
LOG3(("ParseHttpRequestHeaders %p used %d of %d. complete = %d", }
this, *countRead, count, mAllHeadersSent)); }
if (mAllHeadersSent) {
if (mRequestHeadersDone && !mOpenGenerated) {
if (!mSession->TryToActivate(this)) {
LOG3(("Http2Stream::OnReadSegment %p cannot activate now. queued.\n", this));
return NS_OK;
}
if (NS_FAILED(rv = GenerateOpen())) {
return rv;
}
}
LOG3(("ParseHttpRequestHeaders %p used %d of %d. "
"requestheadersdone = %d mOpenGenerated = %d\n",
this, *countRead, count, mRequestHeadersDone, mOpenGenerated));
if (mOpenGenerated) {
SetHTTPState(OPEN); SetHTTPState(OPEN);
AdjustInitialWindow(); AdjustInitialWindow();
// This version of TransmitFrame cannot block // This version of TransmitFrame cannot block

Просмотреть файл

@ -178,10 +178,13 @@ protected:
// The HTTP/2 state for the stream from section 5.1 // The HTTP/2 state for the stream from section 5.1
enum stateType mState; enum stateType mState;
// Flag is set when all http request headers have been read and ID is stable // Flag is set when all http request headers have been read ID is not stable
uint32_t mAllHeadersSent : 1; uint32_t mRequestHeadersDone : 1;
// Flag is set when all http request headers have been read and ID is stable // Flag is set when ID is stable and concurrency limits are met
uint32_t mOpenGenerated : 1;
// Flag is set when all http response headers have been read
uint32_t mAllHeadersReceived : 1; uint32_t mAllHeadersReceived : 1;
void ChangeState(enum upstreamStateType); void ChangeState(enum upstreamStateType);
@ -190,6 +193,8 @@ private:
friend class nsAutoPtr<Http2Stream>; friend class nsAutoPtr<Http2Stream>;
nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *); nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *);
nsresult GenerateOpen();
void AdjustPushedPriority(); void AdjustPushedPriority();
void AdjustInitialWindow(); void AdjustInitialWindow();
nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment); nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment);

Просмотреть файл

@ -106,7 +106,8 @@ SpdyPushedStream31::ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *co
// the write side of a pushed transaction just involves manipulating a little state // the write side of a pushed transaction just involves manipulating a little state
SpdyStream31::mSentFinOnData = 1; SpdyStream31::mSentFinOnData = 1;
SpdyStream31::mSynFrameComplete = 1; SpdyStream31::mRequestHeadersDone = 1;
SpdyStream31::mSynFrameGenerated = 1;
SpdyStream31::ChangeState(UPSTREAM_COMPLETE); SpdyStream31::ChangeState(UPSTREAM_COMPLETE);
*count = 0; *count = 0;
return NS_OK; return NS_OK;

Просмотреть файл

@ -379,14 +379,15 @@ SpdySession31::AddStream(nsAHttpTransaction *aHttpTransaction,
mStreamTransactionHash.Put(aHttpTransaction, stream); mStreamTransactionHash.Put(aHttpTransaction, stream);
if (RoomForMoreConcurrent()) { mReadyForWrite.Push(stream);
LOG3(("SpdySession31::AddStream %p stream %p activated immediately.", SetWriteCallbacks();
this, stream));
ActivateStream(stream); // Kick off the SYN transmit without waiting for the poll loop
} // This won't work for stream id=1 because there is no segment reader
else { // yet.
LOG3(("SpdySession31::AddStream %p stream %p queued.", this, stream)); if (mSegmentReader) {
mQueuedStreams.Push(stream); uint32_t countRead;
ReadSegments(nullptr, kDefaultBufferSize, &countRead);
} }
if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) && if (!(aHttpTransaction->Caps() & NS_HTTP_ALLOW_KEEPALIVE) &&
@ -400,33 +401,14 @@ SpdySession31::AddStream(nsAHttpTransaction *aHttpTransaction,
} }
void void
SpdySession31::ActivateStream(SpdyStream31 *stream) SpdySession31::QueueStream(SpdyStream31 *stream)
{ {
// will be removed via processpending or a shutdown path
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), MOZ_ASSERT(!stream->CountAsActive());
"Do not activate pushed streams");
nsAHttpTransaction *trans = stream->Transaction(); LOG3(("SpdySession31::QueueStream %p stream %p queued.", this, stream));
if (!trans || !trans->IsNullTransaction() || trans->QuerySpdyConnectTransaction()) { mQueuedStreams.Push(stream);
++mConcurrent;
if (mConcurrent > mConcurrentHighWater) {
mConcurrentHighWater = mConcurrent;
}
LOG3(("SpdySession31::AddStream %p activating stream %p Currently %d "
"streams in session, high water mark is %d",
this, stream, mConcurrent, mConcurrentHighWater));
}
mReadyForWrite.Push(stream);
SetWriteCallbacks();
// Kick off the SYN transmit without waiting for the poll loop
// This won't work for stream id=1 because there is no segment reader
// yet.
if (mSegmentReader) {
uint32_t countRead;
ReadSegments(nullptr, kDefaultBufferSize, &countRead);
}
} }
void void
@ -434,13 +416,15 @@ SpdySession31::ProcessPending()
{ {
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
while (RoomForMoreConcurrent()) { SpdyStream31 *stream;
SpdyStream31 *stream = static_cast<SpdyStream31 *>(mQueuedStreams.PopFront()); while (RoomForMoreConcurrent() &&
if (!stream) (stream = static_cast<SpdyStream31 *>(mQueuedStreams.PopFront()))) {
return;
LOG3(("SpdySession31::ProcessPending %p stream %p activated from queue.", LOG3(("SpdySession31::ProcessPending %p stream %p woken from queue.",
this, stream)); this, stream));
ActivateStream(stream); MOZ_ASSERT(!stream->CountAsActive());
mReadyForWrite.Push(stream);
SetWriteCallbacks();
} }
} }
@ -561,18 +545,60 @@ SpdySession31::ResetDownstreamState()
mInputFrameDataStream = nullptr; mInputFrameDataStream = nullptr;
} }
// return true if activated (and counted against max)
// otherwise return false and queue
bool
SpdySession31::TryToActivate(SpdyStream31 *aStream)
{
if (!RoomForMoreConcurrent()) {
LOG3(("SpdySession31::TryToActivate %p stream=%p no room for more concurrent "
"streams %d\n", this, aStream));
QueueStream(aStream);
return false;
}
IncrementConcurrent(aStream);
return true;
}
void
SpdySession31::IncrementConcurrent(SpdyStream31 *stream)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
"Do not activate pushed streams");
nsAHttpTransaction *trans = stream->Transaction();
if (!trans || !trans->IsNullTransaction() || trans->QuerySpdyConnectTransaction()) {
MOZ_ASSERT(!stream->CountAsActive());
stream->SetCountAsActive(true);
++mConcurrent;
if (mConcurrent > mConcurrentHighWater) {
mConcurrentHighWater = mConcurrent;
}
LOG3(("SpdySession31::AddStream %p counting stream %p Currently %d "
"streams in session, high water mark is %d",
this, stream, mConcurrent, mConcurrentHighWater));
}
}
void void
SpdySession31::DecrementConcurrent(SpdyStream31 *aStream) SpdySession31::DecrementConcurrent(SpdyStream31 *aStream)
{ {
uint32_t id = aStream->StreamID(); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
if (id && !(id & 0x1)) if (!aStream->CountAsActive()) {
return; // pushed streams aren't counted in concurrent limit return;
}
MOZ_ASSERT(mConcurrent); MOZ_ASSERT(mConcurrent);
aStream->SetCountAsActive(false);
--mConcurrent; --mConcurrent;
LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n", LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n",
this, id, mConcurrent)); this, aStream->StreamID(), mConcurrent));
ProcessPending(); ProcessPending();
} }
@ -1414,6 +1440,7 @@ SpdySession31::HandleSettings(SpdySession31 *self)
case SETTINGS_TYPE_MAX_CONCURRENT: case SETTINGS_TYPE_MAX_CONCURRENT:
self->mMaxConcurrent = value; self->mMaxConcurrent = value;
Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value); Telemetry::Accumulate(Telemetry::SPDY_SETTINGS_MAX_STREAMS, value);
self->ProcessPending();
break; break;
case SETTINGS_TYPE_CWND: case SETTINGS_TYPE_CWND:

Просмотреть файл

@ -178,6 +178,7 @@ public:
uint32_t GetServerInitialStreamWindow() { return mServerInitialStreamWindow; } uint32_t GetServerInitialStreamWindow() { return mServerInitialStreamWindow; }
bool TryToActivate(SpdyStream31 *stream);
void ConnectPushedStream(SpdyStream31 *stream); void ConnectPushedStream(SpdyStream31 *stream);
void DecrementConcurrent(SpdyStream31 *stream); void DecrementConcurrent(SpdyStream31 *stream);
@ -223,8 +224,6 @@ private:
void SetWriteCallbacks(); void SetWriteCallbacks();
void RealignOutputQueue(); void RealignOutputQueue();
bool RoomForMoreConcurrent();
void ActivateStream(SpdyStream31 *);
void ProcessPending(); void ProcessPending();
nsresult SetInputFrameDataStream(uint32_t); nsresult SetInputFrameDataStream(uint32_t);
bool VerifyStream(SpdyStream31 *, uint32_t); bool VerifyStream(SpdyStream31 *, uint32_t);
@ -234,6 +233,10 @@ private:
void UpdateLocalStreamWindow(SpdyStream31 *stream, uint32_t bytes); void UpdateLocalStreamWindow(SpdyStream31 *stream, uint32_t bytes);
void UpdateLocalSessionWindow(uint32_t bytes); void UpdateLocalSessionWindow(uint32_t bytes);
bool RoomForMoreConcurrent();
void IncrementConcurrent(SpdyStream31 *stream);
void QueueStream(SpdyStream31 *stream);
// a wrapper for all calls to the nshttpconnection level segment writer. Used // a wrapper for all calls to the nshttpconnection level segment writer. Used
// to track network I/O for timeout purposes // to track network I/O for timeout purposes
nsresult NetworkRead(nsAHttpSegmentWriter *, char *, uint32_t, uint32_t *); nsresult NetworkRead(nsAHttpSegmentWriter *, char *, uint32_t, uint32_t *);

Просмотреть файл

@ -42,7 +42,8 @@ SpdyStream31::SpdyStream31(nsAHttpTransaction *httpTransaction,
: mStreamID(0) : mStreamID(0)
, mSession(spdySession) , mSession(spdySession)
, mUpstreamState(GENERATING_SYN_STREAM) , mUpstreamState(GENERATING_SYN_STREAM)
, mSynFrameComplete(0) , mRequestHeadersDone(0)
, mSynFrameGenerated(0)
, mSentFinOnData(0) , mSentFinOnData(0)
, mTransaction(httpTransaction) , mTransaction(httpTransaction)
, mSocketTransport(spdySession->SocketTransport()) , mSocketTransport(spdySession->SocketTransport())
@ -55,6 +56,7 @@ SpdyStream31::SpdyStream31(nsAHttpTransaction *httpTransaction,
, mSentWaitingFor(0) , mSentWaitingFor(0)
, mReceivedData(0) , mReceivedData(0)
, mSetTCPSocketBuffer(0) , mSetTCPSocketBuffer(0)
, mCountAsActive(0)
, mTxInlineFrameSize(SpdySession31::kDefaultBufferSize) , mTxInlineFrameSize(SpdySession31::kDefaultBufferSize)
, mTxInlineFrameUsed(0) , mTxInlineFrameUsed(0)
, mTxStreamFrameSize(0) , mTxStreamFrameSize(0)
@ -129,7 +131,7 @@ SpdyStream31::ReadSegments(nsAHttpSegmentReader *reader,
// If not, mark the stream for callback when writing can proceed. // If not, mark the stream for callback when writing can proceed.
if (NS_SUCCEEDED(rv) && if (NS_SUCCEEDED(rv) &&
mUpstreamState == GENERATING_SYN_STREAM && mUpstreamState == GENERATING_SYN_STREAM &&
!mSynFrameComplete) !mRequestHeadersDone)
mSession->TransactionHasDataToWrite(this); mSession->TransactionHasDataToWrite(this);
// mTxinlineFrameUsed represents any queued un-sent frame. It might // mTxinlineFrameUsed represents any queued un-sent frame. It might
@ -259,10 +261,11 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf,
uint32_t *countUsed) uint32_t *countUsed)
{ {
// Returns NS_OK even if the headers are incomplete // Returns NS_OK even if the headers are incomplete
// set mSynFrameComplete flag if they are complete // set mRequestHeadersDone flag if they are complete
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mUpstreamState == GENERATING_SYN_STREAM); MOZ_ASSERT(mUpstreamState == GENERATING_SYN_STREAM);
MOZ_ASSERT(!mRequestHeadersDone);
LOG3(("SpdyStream31::ParseHttpRequestHeaders %p avail=%d state=%x", LOG3(("SpdyStream31::ParseHttpRequestHeaders %p avail=%d state=%x",
this, avail, mUpstreamState)); this, avail, mUpstreamState));
@ -288,7 +291,7 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf,
uint32_t oldLen = mFlatHttpRequestHeaders.Length(); uint32_t oldLen = mFlatHttpRequestHeaders.Length();
mFlatHttpRequestHeaders.SetLength(endHeader + 2); mFlatHttpRequestHeaders.SetLength(endHeader + 2);
*countUsed = avail - (oldLen - endHeader) + 4; *countUsed = avail - (oldLen - endHeader) + 4;
mSynFrameComplete = 1; mRequestHeadersDone = 1;
nsAutoCString hostHeader; nsAutoCString hostHeader;
nsAutoCString hashkey; nsAutoCString hashkey;
@ -330,15 +333,24 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf,
// There is probably pushed data buffered so trigger a read manually // There is probably pushed data buffered so trigger a read manually
// as we can't rely on future network events to do it // as we can't rely on future network events to do it
mSession->ConnectPushedStream(this); mSession->ConnectPushedStream(this);
mSynFrameGenerated = 1;
return NS_OK; return NS_OK;
} }
} }
return NS_OK;
}
nsresult
SpdyStream31::GenerateSynFrame()
{
// It is now OK to assign a streamID that we are assured will // It is now OK to assign a streamID that we are assured will
// be monotonically increasing amongst syn-streams on this // be monotonically increasing amongst syn-streams on this
// session // session
mStreamID = mSession->RegisterStreamID(this); mStreamID = mSession->RegisterStreamID(this);
MOZ_ASSERT(mStreamID & 1, "Spdy Stream Channel ID must be odd"); MOZ_ASSERT(mStreamID & 1, "Spdy Stream Channel ID must be odd");
MOZ_ASSERT(!mSynFrameGenerated);
mSynFrameGenerated = 1;
if (mStreamID >= 0x80000000) { if (mStreamID >= 0x80000000) {
// streamID must fit in 31 bits. This is theoretically possible // streamID must fit in 31 bits. This is theoretically possible
@ -507,6 +519,8 @@ SpdyStream31::ParseHttpRequestHeaders(const char *buf,
CompressToFrame(NS_LITERAL_CSTRING(":version")); CompressToFrame(NS_LITERAL_CSTRING(":version"));
CompressToFrame(versionHeader); CompressToFrame(versionHeader);
nsAutoCString hostHeader;
mTransaction->RequestHead()->GetHeader(nsHttp::Host, hostHeader);
CompressToFrame(NS_LITERAL_CSTRING(":host")); CompressToFrame(NS_LITERAL_CSTRING(":host"));
CompressToFrame(hostHeader); CompressToFrame(hostHeader);
@ -1455,12 +1469,26 @@ SpdyStream31::OnReadSegment(const char *buf,
// the number of those bytes that we consume (i.e. the portion that are // the number of those bytes that we consume (i.e. the portion that are
// header bytes) // header bytes)
rv = ParseHttpRequestHeaders(buf, count, countRead); if (!mRequestHeadersDone) {
if (NS_FAILED(rv)) if (NS_FAILED(rv = ParseHttpRequestHeaders(buf, count, countRead))) {
return rv; return rv;
LOG3(("ParseHttpRequestHeaders %p used %d of %d. complete = %d", }
this, *countRead, count, mSynFrameComplete)); }
if (mSynFrameComplete) {
if (mRequestHeadersDone && !mSynFrameGenerated) {
if (!mSession->TryToActivate(this)) {
LOG3(("SpdyStream31::OnReadSegment %p cannot activate now. queued.\n", this));
return NS_OK;
}
if (NS_FAILED(rv = GenerateSynFrame())) {
return rv;
}
}
LOG3(("ParseHttpRequestHeaders %p used %d of %d. "
"requestheadersdone = %d mSynFrameGenerated = %d\n",
this, *countRead, count, mRequestHeadersDone, mSynFrameGenerated));
if (mSynFrameGenerated) {
AdjustInitialWindow(); AdjustInitialWindow();
rv = TransmitFrame(nullptr, nullptr, true); rv = TransmitFrame(nullptr, nullptr, true);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) { if (rv == NS_BASE_STREAM_WOULD_BLOCK) {

Просмотреть файл

@ -55,6 +55,9 @@ public:
void SetRecvdData(bool aStatus) { mReceivedData = aStatus ? 1 : 0; } void SetRecvdData(bool aStatus) { mReceivedData = aStatus ? 1 : 0; }
bool RecvdData() { return mReceivedData; } bool RecvdData() { return mReceivedData; }
void SetCountAsActive(bool aStatus) { mCountAsActive = aStatus ? 1 : 0; }
bool CountAsActive() { return mCountAsActive; }
void UpdateTransportSendEvents(uint32_t count); void UpdateTransportSendEvents(uint32_t count);
void UpdateTransportReadEvents(uint32_t count); void UpdateTransportReadEvents(uint32_t count);
@ -118,8 +121,11 @@ protected:
// sending_request_body for each SPDY chunk in the upload. // sending_request_body for each SPDY chunk in the upload.
enum stateType mUpstreamState; enum stateType mUpstreamState;
// Flag is set when all http request headers have been read and ID is stable // Flag is set when all http request headers have been read
uint32_t mSynFrameComplete : 1; uint32_t mRequestHeadersDone : 1;
// Flag is set when stream ID is stable
uint32_t mSynFrameGenerated : 1;
// Flag is set when a FIN has been placed on a data or syn packet // Flag is set when a FIN has been placed on a data or syn packet
// (i.e after the client has closed) // (i.e after the client has closed)
@ -135,6 +141,8 @@ private:
void *); void *);
nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *); nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *);
nsresult GenerateSynFrame();
void AdjustInitialWindow(); void AdjustInitialWindow();
nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment); nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment);
void GenerateDataFrameHeader(uint32_t, bool); void GenerateDataFrameHeader(uint32_t, bool);
@ -185,6 +193,9 @@ private:
// Flag is set after TCP send autotuning has been disabled // Flag is set after TCP send autotuning has been disabled
uint32_t mSetTCPSocketBuffer : 1; uint32_t mSetTCPSocketBuffer : 1;
// Flag is set when stream is counted towards MAX_CONCURRENT streams in session
uint32_t mCountAsActive : 1;
// The InlineFrame and associated data is used for composing control // The InlineFrame and associated data is used for composing control
// frames and data frame headers. // frames and data frame headers.
nsAutoArrayPtr<uint8_t> mTxInlineFrame; nsAutoArrayPtr<uint8_t> mTxInlineFrame;