bug 790388 part 3 - spdy/3 server push r=hurley

--HG--
extra : rebase_source : 1ed60ebda0b790f0ff1578abe7a7732676b495cf
This commit is contained in:
Patrick McManus 2013-05-29 00:07:03 -04:00
Родитель e4215ee002
Коммит e18fe0b0fc
21 изменённых файлов: 1287 добавлений и 178 удалений

Просмотреть файл

@ -50,6 +50,9 @@ pref("network.http.max-connections", 20);
pref("network.http.max-persistent-connections-per-server", 6);
pref("network.http.max-persistent-connections-per-proxy", 20);
// spdy
pref("network.http.spdy.push-allowance", 32768);
// See bug 545869 for details on why these are set the way they are
pref("network.buffer.cache.count", 24);
pref("network.buffer.cache.size", 16384);

Просмотреть файл

@ -95,6 +95,9 @@ pref("network.http.max-connections", 20);
pref("network.http.max-persistent-connections-per-server", 6);
pref("network.http.max-persistent-connections-per-proxy", 20);
// spdy
pref("network.http.spdy.push-allowance", 32768);
// See bug 545869 for details on why these are set the way they are
pref("network.buffer.cache.count", 24);
pref("network.buffer.cache.size", 16384);

Просмотреть файл

@ -1020,6 +1020,8 @@ pref("network.http.spdy.persistent-settings", false);
pref("network.http.spdy.ping-threshold", 58);
pref("network.http.spdy.ping-timeout", 8);
pref("network.http.spdy.send-buffer-size", 131072);
pref("network.http.spdy.allow-push", true);
pref("network.http.spdy.push-allowance", 65536);
pref("network.http.diagnostics", false);

Просмотреть файл

@ -80,11 +80,18 @@ interface nsILoadGroup : nsIRequest
readonly attribute nsILoadGroupConnectionInfo connectionInfo;
};
%{C++
#include "mozilla/net/PSpdyPush3.h"
%}
[ptr] native SpdyPushCache3Ptr(mozilla::net::SpdyPushCache3);
/**
* Used to maintain state about the connections of a load group and
* how they interact with blocking items like HEAD css/js loads.
*/
[uuid(d1f9f18e-3d85-473a-ad58-a2367d7cdb2a)]
[uuid(5361f30e-f968-437c-8f41-69d2756a6022)]
interface nsILoadGroupConnectionInfo : nsISupports
{
/**
@ -104,4 +111,11 @@ interface nsILoadGroupConnectionInfo : nsISupports
* blockers.
*/
unsigned long removeBlockingTransaction();
/* reading this attribute gives out weak pointers to the push
* cache. The nsILoadGroupConnectionInfo implemenation owns the cache
* and will destroy it when overwritten or when the load group
* ends.
*/
[noscript] attribute SpdyPushCache3Ptr spdyPushCache3;
};

Просмотреть файл

@ -1054,6 +1054,7 @@ public:
nsLoadGroupConnectionInfo();
private:
int32_t mBlockingTransactionCount; // signed for PR_ATOMIC_*
nsAutoPtr<mozilla::net::SpdyPushCache3> mSpdyCache3;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(nsLoadGroupConnectionInfo, nsILoadGroupConnectionInfo)
@ -1087,6 +1088,20 @@ nsLoadGroupConnectionInfo::RemoveBlockingTransaction(uint32_t *_retval)
return NS_OK;
}
/* [noscript] attribute SpdyPushCache3Ptr spdyPushCache3; */
NS_IMETHODIMP
nsLoadGroupConnectionInfo::GetSpdyPushCache3(mozilla::net::SpdyPushCache3 **aSpdyPushCache3)
{
*aSpdyPushCache3 = mSpdyCache3.get();
return NS_OK;
}
NS_IMETHODIMP
nsLoadGroupConnectionInfo::SetSpdyPushCache3(mozilla::net::SpdyPushCache3 *aSpdyPushCache3)
{
mSpdyCache3 = aSpdyPushCache3;
return NS_OK;
}
nsresult nsLoadGroup::Init()
{
static PLDHashTableOps hash_table_ops =

Просмотреть файл

@ -42,6 +42,12 @@ public:
const static uint32_t kSendingChunkSize = 4096;
const static uint32_t kTCPSendBufferSize = 131072;
// until we have an API that can push back on receiving data (right now
// WriteSegments is obligated to accept data and buffer) there is no
// reason to throttle with the rwin other than in server push
// scenarios.
const static uint32_t kInitialRwin = 256 * 1024 * 1024;
};
// this is essentially a single instantiation as a member of nsHttpHandler.

Просмотреть файл

@ -0,0 +1,60 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// SPDY Server Push as defined by
// http://dev.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3
/*
A pushed stream is put into a memory buffer (The SpdyPushTransactionBuffer)
and spooled there until a GET is made that can be matched up with it. At
that time we have two spdy streams - the GET (aka the sink) and the PUSH
(aka the source). Data is copied between those two streams for the lifetime
of the transaction. This is true even if the transaction buffer is empty,
partly complete, or totally loaded at the time the GET correspondence is made.
correspondence is done through a hash table of the full url, the spdy session,
and the load group. The load group is implicit because that's where the
hash is stored, the other items comprise the hash key.
Pushed streams are subject to aggressive flow control before they are matched
with a GET at which point flow control is effectively disabled to match the
client pull behavior.
*/
#ifndef mozilla_net_SpdyPush3_Public_h
#define mozilla_net_SpdyPush3_Public_h
#include "nsAutoPtr.h"
#include "nsDataHashtable.h"
#include "nsISupports.h"
class nsCString;
namespace mozilla {
namespace net {
class SpdyPushedStream3;
// One Cache per load group
class SpdyPushCache3
{
public:
SpdyPushCache3();
virtual ~SpdyPushCache3();
// The cache holds only weak pointers - no references
bool RegisterPushedStream(nsCString key,
SpdyPushedStream3 *stream);
SpdyPushedStream3 *RemovePushedStream(nsCString key);
SpdyPushedStream3 *GetPushedStream(nsCString key);
private:
nsDataHashtable<nsCStringHashKey, SpdyPushedStream3 *> mHash;
};
} // namespace mozilla::net
} // namespace mozilla
#endif // mozilla_net_SpdyPush3_Public_h

Просмотреть файл

@ -0,0 +1,395 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set sw=2 ts=8 et tw=80 : */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include <algorithm>
#include "nsDependentString.h"
#include "SpdyPush3.h"
namespace mozilla {
namespace net {
//////////////////////////////////////////
// SpdyPushedStream3
//////////////////////////////////////////
SpdyPushedStream3::SpdyPushedStream3(SpdyPush3TransactionBuffer *aTransaction,
SpdySession3 *aSession,
SpdyStream3 *aAssociatedStream,
uint32_t aID)
:SpdyStream3(aTransaction, aSession,
0 /* priority is only for sending, so ignore it on push */)
, mConsumerStream(nullptr)
, mBufferedPush(aTransaction)
, mStatus(NS_OK)
, mPushCompleted(false)
, mDeferCleanupOnSuccess(true)
{
LOG3(("SpdyPushedStream3 ctor this=%p 0x%X\n", this, aID));
mStreamID = aID;
mBufferedPush->SetPushStream(this);
mLoadGroupCI = aAssociatedStream->LoadGroupConnectionInfo();
mLastRead = TimeStamp::Now();
}
bool
SpdyPushedStream3::GetPushComplete()
{
return mPushCompleted;
}
nsresult
SpdyPushedStream3::WriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten)
{
nsresult rv = SpdyStream3::WriteSegments(writer, count, countWritten);
if (NS_SUCCEEDED(rv) && *countWritten) {
mLastRead = TimeStamp::Now();
}
if (rv == NS_BASE_STREAM_CLOSED) {
mPushCompleted = true;
rv = NS_OK; // this is what a normal HTTP transaction would do
}
if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_FAILED(rv))
mStatus = rv;
return rv;
}
nsresult
SpdyPushedStream3::ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *count)
{
// The SYN_STREAM for this has been processed, so we need to verify
// that :host, :scheme, and :path MUST be present
nsDependentCSubstring host, scheme, path;
nsresult rv;
rv = SpdyStream3::FindHeader(NS_LITERAL_CSTRING(":host"), host);
if (NS_FAILED(rv)) {
LOG3(("SpdyPushedStream3::ReadSegments session=%p ID 0x%X "
"push without required :host\n", mSession, mStreamID));
return rv;
}
rv = SpdyStream3::FindHeader(NS_LITERAL_CSTRING(":scheme"), scheme);
if (NS_FAILED(rv)) {
LOG3(("SpdyPushedStream3::ReadSegments session=%p ID 0x%X "
"push without required :scheme\n", mSession, mStreamID));
return rv;
}
rv = SpdyStream3::FindHeader(NS_LITERAL_CSTRING(":path"), path);
if (NS_FAILED(rv)) {
LOG3(("SpdyPushedStream3::ReadSegments session=%p ID 0x%X "
"push without required :host\n", mSession, mStreamID));
return rv;
}
CreatePushHashKey(nsCString(scheme), nsCString(host),
mSession->Serial(), path,
mOrigin, mHashKey);
LOG3(("SpdyPushStream3 0x%X hash key %s\n", mStreamID, mHashKey.get()));
// the write side of a pushed transaction just involves manipulating a little state
SpdyStream3::mSentFinOnData = 1;
SpdyStream3::mSynFrameComplete = 1;
SpdyStream3::ChangeState(UPSTREAM_COMPLETE);
*count = 0;
return NS_OK;
}
bool
SpdyPushedStream3::GetHashKey(nsCString &key)
{
if (mHashKey.IsEmpty())
return false;
key = mHashKey;
return true;
}
void
SpdyPushedStream3::ConnectPushedStream(SpdyStream3 *stream)
{
mSession->ConnectPushedStream(stream);
}
bool
SpdyPushedStream3::IsOrphaned(TimeStamp now)
{
MOZ_ASSERT(!now.IsNull());
// if spdy is not transmitting, and is also not connected to a consumer
// stream, and its been like that for too long then it is oprhaned
if (mConsumerStream)
return false;
bool rv = ((now - mLastRead).ToSeconds() > 30.0);
if (rv) {
LOG3(("SpdyPushCache3::IsOrphaned 0x%X IsOrphaned %3.2f\n",
mStreamID, (now - mLastRead).ToSeconds()));
}
return rv;
}
nsresult
SpdyPushedStream3::GetBufferedData(char *buf,
uint32_t count,
uint32_t *countWritten)
{
if (NS_FAILED(mStatus))
return mStatus;
nsresult rv = mBufferedPush->GetBufferedData(buf, count, countWritten);
if (NS_FAILED(rv))
return rv;
if (!*countWritten)
rv = GetPushComplete() ? NS_BASE_STREAM_CLOSED : NS_BASE_STREAM_WOULD_BLOCK;
return rv;
}
//////////////////////////////////////////
// SpdyPushCache3
//////////////////////////////////////////
SpdyPushCache3::SpdyPushCache3()
{
mHash.Init();
}
SpdyPushCache3::~SpdyPushCache3()
{
mHash.Clear();
}
SpdyPushedStream3 *
SpdyPushCache3::GetPushedStream(nsCString key)
{
return mHash.Get(key);
}
bool
SpdyPushCache3::RegisterPushedStream(nsCString key,
SpdyPushedStream3 *stream)
{
LOG3(("SpdyPushCache3::RegisterPushedStream %s 0x%X\n",
key.get(), stream->StreamID()));
if(mHash.Get(key))
return false;
mHash.Put(key, stream);
return true;
}
SpdyPushedStream3 *
SpdyPushCache3::RemovePushedStream(nsCString key)
{
SpdyPushedStream3 *rv = mHash.Get(key);
LOG3(("SpdyPushCache3::RemovePushedStream %s 0x%X\n",
key.get(), rv ? rv->StreamID() : 0));
if (rv)
mHash.Remove(key);
return rv;
}
//////////////////////////////////////////
// SpdyPush3TransactionBuffer
// This is the nsAHttpTransction owned by the stream when the pushed
// stream has not yet been matched with a pull request
//////////////////////////////////////////
NS_IMPL_THREADSAFE_ISUPPORTS0(SpdyPush3TransactionBuffer)
SpdyPush3TransactionBuffer::SpdyPush3TransactionBuffer()
: mStatus(NS_OK)
, mRequestHead(nullptr)
, mPushStream(nullptr)
, mIsDone(false)
, mBufferedHTTP1Size(kDefaultBufferSize)
, mBufferedHTTP1Used(0)
, mBufferedHTTP1Consumed(0)
{
mBufferedHTTP1 = new char[mBufferedHTTP1Size];
}
SpdyPush3TransactionBuffer::~SpdyPush3TransactionBuffer()
{
delete mRequestHead;
}
void
SpdyPush3TransactionBuffer::SetConnection(nsAHttpConnection *conn)
{
}
nsAHttpConnection *
SpdyPush3TransactionBuffer::Connection()
{
return nullptr;
}
void
SpdyPush3TransactionBuffer::GetSecurityCallbacks(nsIInterfaceRequestor **outCB)
{
*outCB = nullptr;
}
void
SpdyPush3TransactionBuffer::OnTransportStatus(nsITransport* transport,
nsresult status, uint64_t progress)
{
}
bool
SpdyPush3TransactionBuffer::IsDone()
{
return mIsDone;
}
nsresult
SpdyPush3TransactionBuffer::Status()
{
return mStatus;
}
uint32_t
SpdyPush3TransactionBuffer::Caps()
{
return 0;
}
uint64_t
SpdyPush3TransactionBuffer::Available()
{
return mBufferedHTTP1Used - mBufferedHTTP1Consumed;
}
nsresult
SpdyPush3TransactionBuffer::ReadSegments(nsAHttpSegmentReader *reader,
uint32_t count, uint32_t *countRead)
{
*countRead = 0;
return NS_ERROR_NOT_IMPLEMENTED;
}
nsresult
SpdyPush3TransactionBuffer::WriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count, uint32_t *countWritten)
{
if ((mBufferedHTTP1Size - mBufferedHTTP1Used) < 20480) {
SpdySession3::EnsureBuffer(mBufferedHTTP1,
mBufferedHTTP1Size + kDefaultBufferSize,
mBufferedHTTP1Used,
mBufferedHTTP1Size);
}
count = std::min(count, mBufferedHTTP1Size - mBufferedHTTP1Used);
nsresult rv = writer->OnWriteSegment(mBufferedHTTP1 + mBufferedHTTP1Used,
count, countWritten);
if (NS_SUCCEEDED(rv)) {
mBufferedHTTP1Used += *countWritten;
}
else if (rv == NS_BASE_STREAM_CLOSED) {
mIsDone = true;
}
if (Available()) {
SpdyStream3 *consumer = mPushStream->GetConsumerStream();
if (consumer) {
LOG3(("SpdyPush3TransactionBuffer::WriteSegments notifying connection "
"consumer data available 0x%X [%u]\n",
mPushStream->StreamID(), Available()));
mPushStream->ConnectPushedStream(consumer);
}
}
return rv;
}
uint32_t
SpdyPush3TransactionBuffer::Http1xTransactionCount()
{
return 0;
}
nsHttpRequestHead *
SpdyPush3TransactionBuffer::RequestHead()
{
if (!mRequestHead)
mRequestHead = new nsHttpRequestHead();
return mRequestHead;
}
nsresult
SpdyPush3TransactionBuffer::TakeSubTransactions(
nsTArray<nsRefPtr<nsAHttpTransaction> > &outTransactions)
{
return NS_ERROR_NOT_IMPLEMENTED;
}
void
SpdyPush3TransactionBuffer::SetProxyConnectFailed()
{
}
void
SpdyPush3TransactionBuffer::Close(nsresult reason)
{
mStatus = reason;
mIsDone = true;
}
nsresult
SpdyPush3TransactionBuffer::AddTransaction(nsAHttpTransaction *trans)
{
return NS_ERROR_NOT_IMPLEMENTED;
}
uint32_t
SpdyPush3TransactionBuffer::PipelineDepth()
{
return 0;
}
nsresult
SpdyPush3TransactionBuffer::SetPipelinePosition(int32_t position)
{
return NS_OK;
}
int32_t
SpdyPush3TransactionBuffer::PipelinePosition()
{
return 1;
}
nsresult
SpdyPush3TransactionBuffer::GetBufferedData(char *buf,
uint32_t count,
uint32_t *countWritten)
{
*countWritten = std::min(count, static_cast<uint32_t>(Available()));
if (*countWritten) {
memcpy(buf, mBufferedHTTP1 + mBufferedHTTP1Consumed, *countWritten);
mBufferedHTTP1Consumed += *countWritten;
}
// If all the data has been consumed then reset the buffer
if (mBufferedHTTP1Consumed == mBufferedHTTP1Used) {
mBufferedHTTP1Consumed = 0;
mBufferedHTTP1Used = 0;
}
return NS_OK;
}
} // namespace mozilla::net
} // namespace mozilla

Просмотреть файл

@ -0,0 +1,102 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
// SPDY Server Push as defined by
// http://dev.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3
#ifndef mozilla_net_SpdyPush3_Internal_h
#define mozilla_net_SpdyPush3_Internal_h
#include "mozilla/Attributes.h"
#include "mozilla/TimeStamp.h"
#include "nsHttpRequestHead.h"
#include "nsILoadGroup.h"
#include "nsString.h"
#include "PSpdyPush3.h"
#include "SpdySession3.h"
#include "SpdyStream3.h"
namespace mozilla {
namespace net {
class SpdyPush3TransactionBuffer;
class SpdyPushedStream3 MOZ_FINAL : public SpdyStream3
{
public:
SpdyPushedStream3(SpdyPush3TransactionBuffer *aTransaction,
SpdySession3 *aSession,
SpdyStream3 *aAssociatedStream,
uint32_t aID);
virtual ~SpdyPushedStream3() {}
bool GetPushComplete();
SpdyStream3 *GetConsumerStream() { return mConsumerStream; };
void SetConsumerStream(SpdyStream3 *aStream) { mConsumerStream = aStream; }
bool GetHashKey(nsCString &key);
// override of SpdyStream3
nsresult ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *);
nsresult WriteSegments(nsAHttpSegmentWriter *, uint32_t, uint32_t *);
nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { return mLoadGroupCI; };
void ConnectPushedStream(SpdyStream3 *consumer);
bool DeferCleanupOnSuccess() { return mDeferCleanupOnSuccess; }
void SetDeferCleanupOnSuccess(bool val) { mDeferCleanupOnSuccess = val; }
bool IsOrphaned(TimeStamp now);
nsresult GetBufferedData(char *buf, uint32_t count, uint32_t *countWritten);
// overload of SpdyStream3
virtual bool HasSink() { return !!mConsumerStream; }
private:
SpdyStream3 *mConsumerStream; // paired request stream that consumes from
// real spdy one.. null until a match is made.
nsCOMPtr<nsILoadGroupConnectionInfo> mLoadGroupCI;
SpdyPush3TransactionBuffer *mBufferedPush;
mozilla::TimeStamp mLastRead;
nsCString mHashKey;
nsresult mStatus;
bool mPushCompleted; // server push FIN received
bool mDeferCleanupOnSuccess;
};
class SpdyPush3TransactionBuffer MOZ_FINAL : public nsAHttpTransaction
{
public:
NS_DECL_ISUPPORTS
NS_DECL_NSAHTTPTRANSACTION
SpdyPush3TransactionBuffer();
virtual ~SpdyPush3TransactionBuffer();
nsresult GetBufferedData(char *buf, uint32_t count, uint32_t *countWritten);
void SetPushStream(SpdyPushedStream3 *stream) { mPushStream = stream; }
private:
const static uint32_t kDefaultBufferSize = 4096;
nsresult mStatus;
nsHttpRequestHead *mRequestHead;
SpdyPushedStream3 *mPushStream;
bool mIsDone;
nsAutoArrayPtr<char> mBufferedHTTP1;
uint32_t mBufferedHTTP1Size;
uint32_t mBufferedHTTP1Used;
uint32_t mBufferedHTTP1Consumed;
};
} // namespace mozilla::net
} // namespace mozilla
#endif // mozilla_net_SpdyPush3_Internal_h

Просмотреть файл

@ -4,15 +4,18 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "nsHttp.h"
#include "SpdySession3.h"
#include "SpdyStream3.h"
#include "nsHttpConnection.h"
#include "nsHttpHandler.h"
#include "prnetdb.h"
#include "mozilla/Telemetry.h"
#include "mozilla/Preferences.h"
#include "nsHttp.h"
#include "nsHttpHandler.h"
#include "nsHttpConnection.h"
#include "nsILoadGroup.h"
#include "prprf.h"
#include "prnetdb.h"
#include "SpdyPush3.h"
#include "SpdySession3.h"
#include "SpdyStream3.h"
#include <algorithm>
#ifdef DEBUG
@ -38,7 +41,6 @@ SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction,
: mSocketTransport(aSocketTransport),
mSegmentReader(nullptr),
mSegmentWriter(nullptr),
mSendingChunkSize(ASpdySession::kSendingChunkSize),
mNextStreamID(1),
mConcurrentHighWater(0),
mDownstreamState(BUFFERING_FRAME_HEADER),
@ -65,8 +67,11 @@ SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction,
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
LOG3(("SpdySession3::SpdySession3 %p transaction 1 = %p",
this, aHttpTransaction));
static uint64_t sSerial;
mSerial = ++sSerial;
LOG3(("SpdySession3::SpdySession3 %p transaction 1 = %p serial=0x%X\n",
this, aHttpTransaction, mSerial));
mStreamIDHash.Init();
mStreamTransactionHash.Init();
@ -75,6 +80,7 @@ SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction,
mOutputQueueBuffer = new char[mOutputQueueSize];
zlibInit();
mPushAllowance = gHttpHandler->SpdyPushAllowance();
mSendingChunkSize = gHttpHandler->SpdySendingChunkSize();
GenerateSettings();
@ -116,9 +122,12 @@ SpdySession3::GoAwayEnumerator(nsAHttpTransaction *key,
// these streams were not processed by the server and can be restarted.
// Do that after the enumerator completes to avoid the risk of
// a restart event re-entrantly modifying this hash.
if (stream->StreamID() > self->mGoAwayID || !stream->HasRegisteredID())
// a restart event re-entrantly modifying this hash. Be sure not to restart
// a pushed (even numbered) stream
if ((stream->StreamID() > self->mGoAwayID && (stream->StreamID() & 1)) ||
!stream->HasRegisteredID()) {
self->mGoAwayStreamsToRestart.Push(stream);
}
return PL_DHASH_NEXT;
}
@ -220,12 +229,12 @@ SpdySession3::ReadTimeoutTick(PRIntervalTime now)
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mNextPingID & 1, "Ping Counter Not Odd");
if (!mPingThreshold)
return;
LOG(("SpdySession3::ReadTimeoutTick %p delta since last read %ds\n",
this, PR_IntervalToSeconds(now - mLastReadEpoch)));
if (!mPingThreshold)
return;
if ((now - mLastReadEpoch) < mPingThreshold) {
// recent activity means ping is not an issue
if (mPingSentEpoch)
@ -260,6 +269,35 @@ SpdySession3::ReadTimeoutTick(PRIntervalTime now)
mNextPingID += 2;
ResumeRecv(); // read the ping reply
// Check for orphaned push streams. This looks expensive, but generally the
// list is empty.
SpdyPushedStream3 *deleteMe;
TimeStamp timestampNow;
do {
deleteMe = nullptr;
for (uint32_t index = mPushedStreams.Length();
index > 0 ; --index) {
SpdyPushedStream3 *pushedStream = mPushedStreams[index - 1];
if (timestampNow.IsNull())
timestampNow = TimeStamp::Now(); // lazy initializer
// if spdy finished, but not connected, and its been like that for too long..
// cleanup the stream..
if (pushedStream->IsOrphaned(timestampNow))
{
LOG3(("SpdySession3 Timeout Pushed Stream %p 0x%X\n",
this, pushedStream->StreamID()));
deleteMe = pushedStream;
break; // don't CleanupStream() while iterating this vector
}
}
if (deleteMe)
CleanupStream(deleteMe, NS_ERROR_ABORT, RST_CANCEL);
} while (deleteMe);
if (mNextPingID == 0xffffffff) {
LOG(("SpdySession3::ReadTimeoutTick %p "
"ping ids exhausted marking goaway\n", this));
@ -268,35 +306,41 @@ SpdySession3::ReadTimeoutTick(PRIntervalTime now)
}
uint32_t
SpdySession3::RegisterStreamID(SpdyStream3 *stream)
SpdySession3::RegisterStreamID(SpdyStream3 *stream, uint32_t aNewID)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
LOG3(("SpdySession3::RegisterStreamID session=%p stream=%p id=0x%X "
"concurrent=%d",this, stream, mNextStreamID, mConcurrent));
MOZ_ASSERT(mNextStreamID < 0xfffffff0,
"should have stopped admitting streams");
uint32_t result = mNextStreamID;
MOZ_ASSERT(!(aNewID & 1),
"0 for autoassign pull, otherwise explicit even push assignment");
if (!aNewID) {
// auto generate a new pull stream ID
aNewID = mNextStreamID;
MOZ_ASSERT(aNewID & 1, "pull ID must be odd.");
mNextStreamID += 2;
}
LOG3(("SpdySession3::RegisterStreamID session=%p stream=%p id=0x%X "
"concurrent=%d",this, stream, aNewID, mConcurrent));
// We've used up plenty of ID's on this session. Start
// moving to a new one before there is a crunch involving
// server push streams or concurrent non-registered submits
if (mNextStreamID >= kMaxStreamID)
if (aNewID >= kMaxStreamID)
mShouldGoAway = true;
// integrity check
if (mStreamIDHash.Get(result)) {
if (mStreamIDHash.Get(aNewID)) {
LOG3((" New ID already present\n"));
MOZ_ASSERT(false, "New ID already present in mStreamIDHash");
mShouldGoAway = true;
return kDeadStreamID;
}
mStreamIDHash.Put(result, stream);
return result;
mStreamIDHash.Put(aNewID, stream);
return aNewID;
}
bool
@ -313,12 +357,7 @@ SpdySession3::AddStream(nsAHttpTransaction *aHttpTransaction,
}
aHttpTransaction->SetConnection(this);
SpdyStream3 *stream = new SpdyStream3(aHttpTransaction,
this,
mSocketTransport,
mSendingChunkSize,
&mUpstreamZlib,
aPriority);
SpdyStream3 *stream = new SpdyStream3(aHttpTransaction, this, aPriority);
LOG3(("SpdySession3::AddStream session=%p stream=%p NextID=0x%X (tentative)",
this, stream, mNextStreamID));
@ -331,8 +370,7 @@ SpdySession3::AddStream(nsAHttpTransaction *aHttpTransaction,
ActivateStream(stream);
}
else {
LOG3(("SpdySession3::AddStream %p stream %p queued.",
this, stream));
LOG3(("SpdySession3::AddStream %p stream %p queued.", this, stream));
mQueuedStreams.Push(stream);
}
@ -343,8 +381,10 @@ void
SpdySession3::ActivateStream(SpdyStream3 *stream)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1),
"Do not activate pushed streams");
mConcurrent++;
++mConcurrent;
if (mConcurrent > mConcurrentHighWater)
mConcurrentHighWater = mConcurrent;
LOG3(("SpdySession3::AddStream %p activating stream %p Currently %d "
@ -486,9 +526,9 @@ SpdySession3::ResetDownstreamState()
if (mInputFrameDataLast && mInputFrameDataStream) {
mInputFrameDataLast = false;
if (!mInputFrameDataStream->RecvdFin()) {
LOG3((" SetRecvdFin id=0x%x\n", mInputFrameDataStream->StreamID()));
mInputFrameDataStream->SetRecvdFin(true);
--mConcurrent;
ProcessPending();
DecrementConcurrent(mInputFrameDataStream);
}
}
mInputFrameBufferUsed = 0;
@ -529,6 +569,21 @@ SpdySession3::EnsureBuffer(nsAutoArrayPtr<uint8_t> &buf,
uint32_t preserve,
uint32_t &objSize);
void
SpdySession3::DecrementConcurrent(SpdyStream3 *aStream)
{
uint32_t id = aStream->StreamID();
if (id && !(id & 0x1))
return; // pushed streams aren't counted in concurrent limit
MOZ_ASSERT(mConcurrent);
--mConcurrent;
LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n",
this, id, mConcurrent));
ProcessPending();
}
void
SpdySession3::zlibInit()
{
@ -692,11 +747,13 @@ SpdySession3::GenerateSettings()
// 2nd entry is bytes 20 to 27
// 3rd entry is bytes 28 to 35
if (!gHttpHandler->AllowSpdyPush()) {
// announcing that we accept 0 incoming streams is done to
// disable server push until that is implemented.
// disable server push
packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_MAX_CONCURRENT;
// The value portion of the setting pair is already initialized to 0
numberOfEntries++;
}
nsRefPtr<nsHttpConnectionInfo> ci;
uint32_t cwnd = 0;
@ -712,8 +769,11 @@ SpdySession3::GenerateSettings()
numberOfEntries++;
}
// Advertise the Push RWIN and on each client SYN_STREAM pipeline
// a window update with it in order to use larger initial windows with pulled
// streams.
packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_INITIAL_WINDOW;
uint32_t rwin = PR_htonl(kInitialRwin);
uint32_t rwin = PR_htonl(mPushAllowance);
memcpy(packet + 16 + 8 * numberOfEntries, &rwin, 4);
numberOfEntries++;
@ -780,6 +840,7 @@ SpdySession3::VerifyStream(SpdyStream3 *aStream, uint32_t aOptionalID = 0)
"optionalID=0x%X trans=%p test=%d\n",
this, aStream, aStream->StreamID(),
aOptionalID, aStream->Transaction(), test));
MOZ_ASSERT(false, "VerifyStream");
return false;
}
@ -792,24 +853,39 @@ SpdySession3::CleanupStream(SpdyStream3 *aStream, nsresult aResult,
LOG3(("SpdySession3::CleanupStream %p %p 0x%X %X\n",
this, aStream, aStream->StreamID(), aResult));
SpdyPushedStream3 *pushSource = nullptr;
if (NS_SUCCEEDED(aResult) && aStream->DeferCleanupOnSuccess()) {
LOG(("SpdySession3::CleanupStream 0x%X deferred\n", aStream->StreamID()));
return;
}
if (!VerifyStream(aStream)) {
LOG(("SpdySession3::CleanupStream failed to verify stream\n"));
return;
}
pushSource = aStream->PushSource();
if (!aStream->RecvdFin() && aStream->StreamID()) {
LOG3(("Stream had not processed recv FIN, sending RST code %X\n",
aResetCode));
GenerateRstStream(aResetCode, aStream->StreamID());
--mConcurrent;
ProcessPending();
DecrementConcurrent(aStream);
}
CloseStream(aStream, aResult);
// Remove the stream from the ID hash table. (this one isn't short, which is
// why it is hashed.)
mStreamIDHash.Remove(aStream->StreamID());
// Remove the stream from the ID hash table and, if an even id, the pushed
// table too.
uint32_t id = aStream->StreamID();
if (id > 0) {
mStreamIDHash.Remove(id);
if (!(id & 1))
mPushedStreams.RemoveElement(aStream);
}
RemoveStreamFromQueues(aStream);
// removing from the stream transaction hash will
// delete the SpdyStream3 and drop the reference to
@ -818,6 +894,29 @@ SpdySession3::CleanupStream(SpdyStream3 *aStream, nsresult aResult,
if (mShouldGoAway && !mStreamTransactionHash.Count())
Close(NS_OK);
if (pushSource) {
pushSource->SetDeferCleanupOnSuccess(false);
CleanupStream(pushSource, aResult, aResetCode);
}
}
static void RemoveStreamFromQueue(SpdyStream3 *aStream, nsDeque &queue)
{
uint32_t size = queue.GetSize();
for (uint32_t count = 0; count < size; ++count) {
SpdyStream3 *stream = static_cast<SpdyStream3 *>(queue.PopFront());
if (stream != aStream)
queue.Push(stream);
}
}
void
SpdySession3::RemoveStreamFromQueues(SpdyStream3 *aStream)
{
RemoveStreamFromQueue(aStream, mReadyForWrite);
RemoveStreamFromQueue(aStream, mQueuedStreams);
RemoveStreamFromQueue(aStream, mReadyForRead);
}
void
@ -834,23 +933,7 @@ SpdySession3::CloseStream(SpdyStream3 *aStream, nsresult aResult)
mInputFrameDataStream = nullptr;
}
// check the streams blocked on write, this is linear but the list
// should be pretty short.
uint32_t size = mReadyForWrite.GetSize();
for (uint32_t count = 0; count < size; ++count) {
SpdyStream3 *stream = static_cast<SpdyStream3 *>(mReadyForWrite.PopFront());
if (stream != aStream)
mReadyForWrite.Push(stream);
}
// Check the streams queued for activation. Because we normally accept a high
// level of parallelization this should also be short.
size = mQueuedStreams.GetSize();
for (uint32_t count = 0; count < size; ++count) {
SpdyStream3 *stream = static_cast<SpdyStream3 *>(mQueuedStreams.PopFront());
if (stream != aStream)
mQueuedStreams.Push(stream);
}
RemoveStreamFromQueues(aStream);
// Send the stream the close() indication
aStream->Close(aResult);
@ -871,9 +954,10 @@ SpdySession3::HandleSynStream(SpdySession3 *self)
PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[2]);
uint32_t associatedID =
PR_ntohl(reinterpret_cast<uint32_t *>(self->mInputFrameBuffer.get())[3]);
uint8_t flags = reinterpret_cast<uint8_t *>(self->mInputFrameBuffer.get())[4];
LOG3(("SpdySession3::HandleSynStream %p recv SYN_STREAM (push) "
"for ID 0x%X associated with 0x%X.",
"for ID 0x%X associated with 0x%X.\n",
self, streamID, associatedID));
if (streamID & 0x01) { // test for odd stream ID
@ -882,6 +966,12 @@ SpdySession3::HandleSynStream(SpdySession3 *self)
return NS_ERROR_ILLEGAL_VALUE;
}
// confirm associated-to
nsresult rv = self->SetInputFrameDataStream(associatedID);
if (NS_FAILED(rv))
return rv;
SpdyStream3 *associatedStream = self->mInputFrameDataStream;
++(self->mServerPushedResources);
// Anytime we start using the high bit of stream ID (either client or server)
@ -889,17 +979,127 @@ SpdySession3::HandleSynStream(SpdySession3 *self)
if (streamID >= kMaxStreamID)
self->mShouldGoAway = true;
bool resetStream = true;
SpdyPushCache3 *cache = nullptr;
if (!(flags & kFlag_Data_UNI)) {
// pushed streams require UNIDIRECTIONAL flag
LOG3(("SpdySession3::HandleSynStream %p ID %0x%X associated ID 0x%X failed.\n",
self, streamID, associatedID));
self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID);
} else if (!associatedID) {
// associated stream 0 will never find a match, but the spec requires a
// PROTOCOL_ERROR in this specific case
LOG3(("SpdySession3::HandleSynStream %p associated ID of 0 failed.\n", self));
self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID);
} else if (!gHttpHandler->AllowSpdyPush()) {
// MAX_CONCURRENT_STREAMS of 0 in settings should have disabled push,
// but some servers are buggy about that.. or the config could have
// been updated after the settings frame was sent. In both cases just
// reject the pushed stream as refused
LOG3(("SpdySession3::HandleSynStream Push Recevied when Disabled\n"));
self->GenerateRstStream(RST_REFUSED_STREAM, streamID);
} else if (!associatedStream) {
LOG3(("SpdySession3::HandleSynStream %p lookup associated ID failed.\n", self));
self->GenerateRstStream(RST_INVALID_STREAM, streamID);
} else {
nsILoadGroupConnectionInfo *loadGroupCI = associatedStream->LoadGroupConnectionInfo();
if (loadGroupCI) {
loadGroupCI->GetSpdyPushCache3(&cache);
if (!cache) {
cache = new SpdyPushCache3();
if (!cache || NS_FAILED(loadGroupCI->SetSpdyPushCache3(cache))) {
delete cache;
cache = nullptr;
}
}
}
if (!cache) {
// this is unexpected, but we can handle it just be refusing the push
LOG3(("SpdySession3::HandleSynStream Push Recevied without loadgroup cache\n"));
self->GenerateRstStream(RST_REFUSED_STREAM, streamID);
}
else {
resetStream = false;
}
}
if (resetStream) {
// Need to decompress the headers even though we aren't using them yet in
// order to keep the compression context consistent for other syn_reply frames
nsresult rv =
self->UncompressAndDiscard(18, self->mInputFrameDataSize - 10);
rv = self->UncompressAndDiscard(18, self->mInputFrameDataSize - 10);
if (NS_FAILED(rv)) {
LOG(("SpdySession3::HandleSynStream uncompress failed\n"));
return rv;
}
self->ResetDownstreamState();
return NS_OK;
}
// Create the buffering transaction and push stream
nsRefPtr<SpdyPush3TransactionBuffer> transactionBuffer =
new SpdyPush3TransactionBuffer();
transactionBuffer->SetConnection(self);
SpdyPushedStream3 *pushedStream =
new SpdyPushedStream3(transactionBuffer, self,
associatedStream, streamID);
// ownership of the pushed stream is by the transaction hash, just as it
// is for a client initiated stream. Errors that aren't fatal to the
// whole session must call cleanupStream() after this point in order
// to remove the stream from that hash.
self->mStreamTransactionHash.Put(transactionBuffer, pushedStream);
self->mPushedStreams.AppendElement(pushedStream);
// The pushed stream is unidirectional so it is fully open immediately
pushedStream->SetFullyOpen();
// Uncompress the response headers into a stream specific buffer, leaving them
// in spdy format for the time being.
rv = pushedStream->Uncompress(&self->mDownstreamZlib,
self->mInputFrameBuffer + 18,
self->mInputFrameDataSize - 10);
if (NS_FAILED(rv)) {
LOG(("SpdySession3::HandleSynStream uncompress failed\n"));
return rv;
}
// todo populate cache. For now, just reject server push p3
self->GenerateRstStream(RST_REFUSED_STREAM, streamID);
if (self->RegisterStreamID(pushedStream, streamID) == kDeadStreamID) {
LOG(("SpdySession3::HandleSynStream registerstreamid failed\n"));
return NS_ERROR_FAILURE;
}
// Fake the request side of the pushed HTTP transaction. Sets up hash
// key and origin
uint32_t notUsed;
pushedStream->ReadSegments(nullptr, 1, &notUsed);
nsAutoCString key;
if (!pushedStream->GetHashKey(key)) {
LOG(("SpdySession3::HandleSynStream one of :host :scheme :path missing from push\n"));
self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM);
self->ResetDownstreamState();
return NS_OK;
}
if (!associatedStream->Origin().Equals(pushedStream->Origin())) {
LOG(("SpdySession3::HandleSynStream pushed stream mismatched origin\n"));
self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM);
self->ResetDownstreamState();
return NS_OK;
}
if (!cache->RegisterPushedStream(key, pushedStream)) {
LOG(("SpdySession3::HandleSynStream registerPushedStream Failed\n"));
self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM);
self->ResetDownstreamState();
return NS_OK;
}
self->ResetDownstreamState();
return NS_OK;
}
@ -1267,8 +1467,8 @@ SpdySession3::HandleGoAway(SpdySession3 *self)
self->mCleanShutdown = true;
// Find streams greater than the last-good ID and mark them for deletion
// in the mGoAwayStreamsToRestart queue with the GoAwayEnumerator. They can
// be restarted.
// in the mGoAwayStreamsToRestart queue with the GoAwayEnumerator. The
// underlying transaction can be restarted.
self->mStreamTransactionHash.Enumerate(GoAwayEnumerator, self);
// Process the streams marked for deletion and restart.
@ -1330,7 +1530,7 @@ SpdySession3::HandleHeaders(SpdySession3 *self)
if (NS_FAILED(self->UncompressAndDiscard(12,
self->mInputFrameDataSize - 4))) {
LOG(("SpdySession3::HandleSynReply uncompress failed\n"));
LOG(("SpdySession3::HandleHeaders uncompress failed\n"));
// this is fatal to the session
return NS_ERROR_FAILURE;
}
@ -1466,8 +1666,9 @@ SpdySession3::OnTransportStatus(nsITransport* aTransport,
case NS_NET_STATUS_CONNECTED_TO:
{
SpdyStream3 *target = mStreamIDHash.Get(1);
if (target)
target->Transaction()->OnTransportStatus(aTransport, aStatus, aProgress);
nsAHttpTransaction *transaction = target ? target->Transaction() : nullptr;
if (transaction)
transaction->OnTransportStatus(aTransport, aStatus, aProgress);
break;
}
@ -1626,6 +1827,42 @@ SpdySession3::WriteSegments(nsAHttpSegmentWriter *writer,
SetWriteCallbacks();
// If there are http transactions attached to a push stream with filled buffers
// trigger that data pump here. This only reads from buffers (not the network)
// so mDownstreamState doesn't matter.
SpdyStream3 *pushConnectedStream =
static_cast<SpdyStream3 *>(mReadyForRead.PopFront());
if (pushConnectedStream) {
LOG3(("SpdySession3::WriteSegments %p processing pushed stream 0x%X\n",
this, pushConnectedStream->StreamID()));
mSegmentWriter = writer;
rv = pushConnectedStream->WriteSegments(this, count, countWritten);
mSegmentWriter = nullptr;
// The pipe in nsHttpTransaction rewrites CLOSED error codes into OK
// so we need this check to determine the truth.
if (NS_SUCCEEDED(rv) && !*countWritten &&
pushConnectedStream->PushSource() &&
pushConnectedStream->PushSource()->GetPushComplete()) {
rv = NS_BASE_STREAM_CLOSED;
}
if (rv == NS_BASE_STREAM_CLOSED) {
CleanupStream(pushConnectedStream, NS_OK, RST_CANCEL);
rv = NS_OK;
}
// if we return OK to nsHttpConnection it will use mSocketInCondition
// to determine whether to schedule more reads, incorrectly
// assuming that nsHttpConnection::OnSocketWrite() was called.
if (NS_SUCCEEDED(rv) || rv == NS_BASE_STREAM_WOULD_BLOCK) {
rv = NS_BASE_STREAM_WOULD_BLOCK;
ResumeRecv();
}
return rv;
}
// We buffer all control frames and act on them in this layer.
// We buffer the first 8 bytes of data frames (the header) but
// the actual data is passed through unprocessed.
@ -1860,9 +2097,9 @@ SpdySession3::WriteSegments(nsAHttpSegmentWriter *writer,
return rv;
}
MOZ_ASSERT(mDownstreamState == BUFFERING_CONTROL_FRAME);
if (mDownstreamState != BUFFERING_CONTROL_FRAME) {
// this cannot happen
MOZ_ASSERT(false, "Not in Bufering Control Frame State");
return NS_ERROR_UNEXPECTED;
}
@ -1918,27 +2155,34 @@ SpdySession3::UpdateLocalRwin(SpdyStream3 *stream,
if (!stream || stream->RecvdFin())
return;
LOG3(("SpdySession3::UpdateLocalRwin %p 0x%X %d\n",
this, stream->StreamID(), bytes));
stream->DecrementLocalWindow(bytes);
// Don't necessarily ack every data packet. Only do it
// after a significant amount of data.
uint64_t unacked = stream->LocalUnAcked();
int64_t localWindow = stream->LocalWindow();
if (unacked < kMinimumToAck) {
// Sanity check to make sure this won't let the window drop below 1MB
PR_STATIC_ASSERT(kMinimumToAck < kInitialRwin);
PR_STATIC_ASSERT((kInitialRwin - kMinimumToAck) > 1024 * 1024);
LOG3(("SpdySession3::UpdateLocalRwin this=%p id=0x%X newbytes=%u "
"unacked=%llu localWindow=%lld\n",
this, stream->StreamID(), bytes, unacked, localWindow));
if (!unacked)
return;
if ((unacked < kMinimumToAck) && (localWindow > kEmergencyWindowThreshold))
return;
if (!stream->HasSink()) {
LOG3(("SpdySession3::UpdateLocalRwin %p 0x%X Pushed Stream Has No Sink\n",
this, stream->StreamID()));
return;
}
// Generate window updates directly out of spdysession instead of the stream
// in order to avoid queue delays in getting the ACK out.
// in order to avoid queue delays in getting the 'ACK' out.
uint32_t toack = (unacked <= 0x7fffffffU) ? unacked : 0x7fffffffU;
LOG3(("SpdySession3::UpdateLocalRwin Ack %p 0x%X %d\n",
LOG3(("SpdySession3::UpdateLocalRwin Ack this=%p id=0x%X acksize=%d\n",
this, stream->StreamID(), toack));
stream->IncrementLocalWindow(toack);
@ -2205,6 +2449,13 @@ SpdySession3::SetNeedsCleanup()
ResetDownstreamState();
}
void
SpdySession3::ConnectPushedStream(SpdyStream3 *stream)
{
mReadyForRead.Push(stream);
ForceRecv();
}
//-----------------------------------------------------------------------------
// Modified methods of nsAHttpConnection
//-----------------------------------------------------------------------------

Просмотреть файл

@ -10,14 +10,13 @@
// http://dev.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3
#include "ASpdySession.h"
#include "mozilla/Attributes.h"
#include "nsClassHashtable.h"
#include "nsDataHashtable.h"
#include "nsDeque.h"
#include "nsHashKeys.h"
#include "zlib.h"
#include "mozilla/Attributes.h"
class nsHttpConnection;
class nsISocketTransport;
namespace mozilla { namespace net {
@ -49,7 +48,8 @@ public:
// Idle time represents time since "goodput".. e.g. a data or header frame
PRIntervalTime IdleTime();
uint32_t RegisterStreamID(SpdyStream3 *);
// Registering with a newID of 0 means pick the next available odd ID
uint32_t RegisterStreamID(SpdyStream3 *, uint32_t aNewID = 0);
const static uint8_t kVersion = 3;
@ -134,11 +134,9 @@ public:
// 31 bit stream ID.
const static uint32_t kDeadStreamID = 0xffffdead;
// until we have an API that can push back on receiving data (right now
// WriteSegments is obligated to accept data and buffer) there is no
// reason to throttle with the rwin other than in server push
// scenarios.
const static uint32_t kInitialRwin = 256 * 1024 * 1024;
// below the emergency threshold of local window we ack every received
// byte. Above that we coalesce bytes into the MinimumToAck size.
const static int32_t kEmergencyWindowThreshold = 1024 * 1024;
const static uint32_t kMinimumToAck = 64 * 1024;
// The default peer rwin is 64KB unless updated by a settings frame
@ -174,8 +172,18 @@ public:
uint32_t GetServerInitialWindow() { return mServerInitialWindow; }
void ConnectPushedStream(SpdyStream3 *stream);
uint64_t Serial() { return mSerial; }
void PrintDiagnostics (nsCString &log);
// Streams need access to these
uint32_t SendingChunkSize() { return mSendingChunkSize; }
uint32_t PushAllowance() { return mPushAllowance; }
z_stream *UpstreamZlib() { return &mUpstreamZlib; }
nsISocketTransport *SocketTransport() { return mSocketTransport; }
private:
enum stateType {
@ -192,6 +200,7 @@ private:
void ChangeDownstreamState(enum stateType);
void ResetDownstreamState();
nsresult UncompressAndDiscard(uint32_t, uint32_t);
void DecrementConcurrent(SpdyStream3 *);
void zlibInit();
void GeneratePing(uint32_t);
void GenerateRstStream(uint32_t, uint32_t);
@ -199,6 +208,7 @@ private:
void CleanupStream(SpdyStream3 *, nsresult, rstReason);
void CloseStream(SpdyStream3 *, nsresult);
void GenerateSettings();
void RemoveStreamFromQueues(SpdyStream3 *);
void SetWriteCallbacks();
void FlushOutputQueue();
@ -229,7 +239,7 @@ private:
nsAutoPtr<SpdyStream3> &,
void *);
// This is intended to be nsHttpConnectionMgr:nsHttpConnectionHandle taken
// This is intended to be nsHttpConnectionMgr:nsConnectionHandle taken
// from the first transaction on this session. That object contains the
// pointer to the real network-level nsHttpConnection object.
nsRefPtr<nsAHttpConnection> mConnection;
@ -246,20 +256,25 @@ private:
uint32_t mSendingChunkSize; /* the transmission chunk size */
uint32_t mNextStreamID; /* 24 bits */
uint32_t mConcurrentHighWater; /* max parallelism on session */
uint32_t mPushAllowance; /* rwin for unmatched pushes */
stateType mDownstreamState; /* in frame, between frames, etc.. */
// Maintain 4 indexes - one by stream ID, one by transaction ptr,
// one list of streams ready to write, one list of streams that are queued
// due to max parallelism settings. The objects
// are not ref counted - they get destroyed
// Maintain 2 indexes - one by stream ID, one by transaction pointer.
// There are also several lists of streams: ready to write, queued due to
// max parallelism, streams that need to force a read for push, and the full
// set of pushed streams.
// The objects are not ref counted - they get destroyed
// by the nsClassHashtable implementation when they are removed from
// the transaction hash.
nsDataHashtable<nsUint32HashKey, SpdyStream3 *> mStreamIDHash;
nsClassHashtable<nsPtrHashKey<nsAHttpTransaction>,
SpdyStream3> mStreamTransactionHash;
nsDeque mReadyForWrite;
nsDeque mQueuedStreams;
nsDeque mReadyForRead;
nsTArray<SpdyPushedStream3 *> mPushedStreams;
// Compression contexts for header transport using deflate.
// SPDY compresses only HTTP headers and does not reset zlib in between
@ -361,6 +376,11 @@ private:
// used as a temporary buffer while enumerating the stream hash during GoAway
nsDeque mGoAwayStreamsToRestart;
// Each session gets a unique serial number because the push cache is correlated
// by the load group and the serial number can be used as part of the cache key
// to make sure streams aren't shared across sessions.
uint64_t mSerial;
};
}} // namespace mozilla::net

Просмотреть файл

@ -4,16 +4,18 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "nsHttp.h"
#include "SpdySession3.h"
#include "SpdyStream3.h"
#include "nsAlgorithm.h"
#include "prnetdb.h"
#include "nsHttpRequestHead.h"
#include "mozilla/Telemetry.h"
#include "nsAlgorithm.h"
#include "nsHttp.h"
#include "nsHttpHandler.h"
#include "nsHttpRequestHead.h"
#include "nsISocketTransport.h"
#include "nsISupportsPriority.h"
#include "nsHttpHandler.h"
#include "prnetdb.h"
#include "SpdyPush3.h"
#include "SpdySession3.h"
#include "SpdyStream3.h"
#include <algorithm>
#ifdef DEBUG
@ -26,21 +28,18 @@ namespace net {
SpdyStream3::SpdyStream3(nsAHttpTransaction *httpTransaction,
SpdySession3 *spdySession,
nsISocketTransport *socketTransport,
uint32_t chunkSize,
z_stream *compressionContext,
int32_t priority)
: mUpstreamState(GENERATING_SYN_STREAM),
mTransaction(httpTransaction),
: mStreamID(0),
mSession(spdySession),
mSocketTransport(socketTransport),
mUpstreamState(GENERATING_SYN_STREAM),
mSynFrameComplete(0),
mSentFinOnData(0),
mTransaction(httpTransaction),
mSocketTransport(spdySession->SocketTransport()),
mSegmentReader(nullptr),
mSegmentWriter(nullptr),
mStreamID(0),
mChunkSize(chunkSize),
mSynFrameComplete(0),
mChunkSize(spdySession->SendingChunkSize()),
mRequestBlockedOnRead(0),
mSentFinOnData(0),
mRecvdFin(0),
mFullyOpen(0),
mSentWaitingFor(0),
@ -49,23 +48,25 @@ SpdyStream3::SpdyStream3(nsAHttpTransaction *httpTransaction,
mTxInlineFrameSize(SpdySession3::kDefaultBufferSize),
mTxInlineFrameUsed(0),
mTxStreamFrameSize(0),
mZlib(compressionContext),
mZlib(spdySession->UpstreamZlib()),
mDecompressBufferSize(SpdySession3::kDefaultBufferSize),
mDecompressBufferUsed(0),
mDecompressedBytes(0),
mRequestBodyLenRemaining(0),
mPriority(priority),
mLocalWindow(SpdySession3::kInitialRwin),
mLocalUnacked(0),
mBlockedOnRwin(false),
mTotalSent(0),
mTotalRead(0)
mTotalRead(0),
mPushSource(nullptr)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
LOG3(("SpdyStream3::SpdyStream3 %p", this));
mRemoteWindow = spdySession->GetServerInitialWindow();
mLocalWindow = spdySession->PushAllowance();
mTxInlineFrame = new uint8_t[mTxInlineFrameSize];
mDecompressBuffer = new char[mDecompressBufferSize];
}
@ -186,15 +187,16 @@ SpdyStream3::WriteSegments(nsAHttpSegmentWriter *writer,
uint32_t count,
uint32_t *countWritten)
{
LOG3(("SpdyStream3::WriteSegments %p count=%d state=%x",
this, count, mUpstreamState));
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(!mSegmentWriter, "segment writer in progress");
LOG3(("SpdyStream3::WriteSegments %p count=%d state=%x",
this, count, mUpstreamState));
mSegmentWriter = writer;
nsresult rv = mTransaction->WriteSegments(writer, count, countWritten);
nsresult rv = mTransaction->WriteSegments(this, count, countWritten);
mSegmentWriter = nullptr;
return rv;
}
@ -210,6 +212,26 @@ SpdyStream3::hdrHashEnumerate(const nsACString &key,
return PL_DHASH_NEXT;
}
void
SpdyStream3::CreatePushHashKey(const nsCString &scheme,
const nsCString &hostHeader,
uint64_t serial,
const nsCSubstring &pathInfo,
nsCString &outOrigin,
nsCString &outKey)
{
outOrigin = scheme;
outOrigin.Append(NS_LITERAL_CSTRING("://"));
outOrigin.Append(hostHeader);
outKey = outOrigin;
outKey.Append(NS_LITERAL_CSTRING("/[spdy3."));
outKey.AppendInt(serial);
outKey.Append(NS_LITERAL_CSTRING("]"));
outKey.Append(pathInfo);
}
nsresult
SpdyStream3::ParseHttpRequestHeaders(const char *buf,
uint32_t avail,
@ -247,6 +269,44 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf,
*countUsed = avail - (oldLen - endHeader) + 4;
mSynFrameComplete = 1;
nsCString hostHeader;
nsCString hashkey;
mTransaction->RequestHead()->GetHeader(nsHttp::Host, hostHeader);
CreatePushHashKey(NS_LITERAL_CSTRING("https"),
hostHeader, mSession->Serial(),
mTransaction->RequestHead()->RequestURI(),
mOrigin, hashkey);
// check the push cache for GET
if (mTransaction->RequestHead()->Method() == nsHttp::Get) {
// from :scheme, :host, :path
nsILoadGroupConnectionInfo *loadGroupCI = mTransaction->LoadGroupConnectionInfo();
SpdyPushCache3 *cache = nullptr;
if (loadGroupCI)
loadGroupCI->GetSpdyPushCache3(&cache);
SpdyPushedStream3 *pushedStream = nullptr;
// we remove the pushedstream from the push cache so that
// it will not be used for another GET. This does not destroy the
// stream itself - that is done when the transactionhash is done with it.
if (cache)
pushedStream = cache->RemovePushedStream(hashkey);
if (pushedStream) {
LOG3(("Pushed Stream Match located id=0x%X key=%s\n",
pushedStream->StreamID(), hashkey.get()));
pushedStream->SetConsumerStream(this);
mPushSource = pushedStream;
mSentFinOnData = 1;
// There is probably pushed data buffered so trigger a read manually
// as we can't rely on future network events to do it
mSession->ConnectPushedStream(this);
return NS_OK;
}
}
// It is now OK to assign a streamID that we are assured will
// be monotonically increasing amongst syn-streams on this
// session
@ -309,11 +369,6 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf,
// The client cert "slot". Right now we don't send client certs
mTxInlineFrame[17] = 0;
const char *methodHeader = mTransaction->RequestHead()->Method().get();
nsCString hostHeader;
mTransaction->RequestHead()->GetHeader(nsHttp::Host, hostHeader);
nsCString versionHeader;
if (mTransaction->RequestHead()->Version() == NS_HTTP_VERSION_1_1)
versionHeader = NS_LITERAL_CSTRING("HTTP/1.1");
@ -388,6 +443,8 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf,
// contain auth. The http transaction already logs the sanitized request
// headers at this same level so it is not necessary to do so here.
const char *methodHeader = mTransaction->RequestHead()->Method().get();
// The header block length
uint16_t count = hdrHash.Count() + 5; /* method, path, version, host, scheme */
CompressToFrame(count);
@ -452,6 +509,75 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf,
return NS_OK;
}
void
SpdyStream3::AdjustInitialWindow()
{
MOZ_ASSERT(mSession->PushAllowance() <= ASpdySession::kInitialRwin);
// The session initial_window is sized for serverpushed streams. When we
// generate a client pulled stream we want to adjust the initial window
// to a huge value in a pipeline with that SYN_STREAM.
// >0 even numbered IDs are pushed streams.
// odd numbered IDs are pulled streams.
// 0 is the sink for a pushed stream.
SpdyStream3 *stream = this;
if (!mStreamID) {
MOZ_ASSERT(mPushSource);
if (!mPushSource)
return;
stream = mPushSource;
MOZ_ASSERT(stream->mStreamID);
MOZ_ASSERT(!(stream->mStreamID & 1)); // is a push stream
// If the pushed stream has sent a FIN, there is no reason to update
// the window
if (stream->RecvdFin())
return;
}
// For server pushes we also want to include in the ack any data that has been
// buffered but unacknowledged.
// mLocalUnacked is technically 64 bits, but because it can never grow larger than
// our window size (which is closer to 29bits max) we know it fits comfortably in 32.
// However we don't really enforce that, and track it as a 64 so that broken senders
// can still interoperate. That means we have to be careful with this calculation.
uint64_t toack64 = (ASpdySession::kInitialRwin - mSession->PushAllowance()) +
stream->mLocalUnacked;
stream->mLocalUnacked = 0;
if (toack64 > 0x7fffffff) {
stream->mLocalUnacked = toack64 - 0x7fffffff;
toack64 = 0x7fffffff;
}
uint32_t toack = static_cast<uint32_t>(toack64);
if (!toack)
return;
toack = PR_htonl(toack);
SpdySession3::EnsureBuffer(mTxInlineFrame,
mTxInlineFrameUsed + 16,
mTxInlineFrameUsed,
mTxInlineFrameSize);
unsigned char *packet = mTxInlineFrame.get() + mTxInlineFrameUsed;
mTxInlineFrameUsed += 16;
memset(packet, 0, 8);
packet[0] = SpdySession3::kFlag_Control;
packet[1] = SpdySession3::kVersion;
packet[3] = SpdySession3::CONTROL_TYPE_WINDOW_UPDATE;
packet[7] = 8; // 8 data bytes after 8 byte header
uint32_t id = PR_htonl(stream->mStreamID);
memcpy(packet + 8, &id, 4);
memcpy(packet + 12, &toack, 4);
stream->mLocalWindow += PR_ntohl(toack);
LOG3(("AdjustInitialwindow %p 0x%X %u\n",
this, stream->mStreamID, PR_ntohl(toack)));
}
void
SpdyStream3::UpdateTransportReadEvents(uint32_t count)
{
@ -510,6 +636,16 @@ SpdyStream3::TransmitFrame(const char *buf,
// flush internal buffers that were previously blocked on writing. You can
// of course feed new data to it as well.
LOG3(("SpdyStream3::TransmitFrame %p inline=%d stream=%d",
this, mTxInlineFrameUsed, mTxStreamFrameSize));
if (countUsed)
*countUsed = 0;
if (!mTxInlineFrameUsed) {
MOZ_ASSERT(!buf);
return NS_OK;
}
MOZ_ASSERT(mTxInlineFrameUsed, "empty stream frame in transmit");
MOZ_ASSERT(mSegmentReader, "TransmitFrame with null mSegmentReader");
MOZ_ASSERT((buf && countUsed) || (!buf && !countUsed),
@ -518,11 +654,6 @@ SpdyStream3::TransmitFrame(const char *buf,
uint32_t transmittedCount;
nsresult rv;
LOG3(("SpdyStream3::TransmitFrame %p inline=%d stream=%d",
this, mTxInlineFrameUsed, mTxStreamFrameSize));
if (countUsed)
*countUsed = 0;
// In the (relatively common) event that we have a small amount of data
// split between the inlineframe and the streamframe, then move the stream
// data into the inlineframe via copy in order to coalesce into one write.
@ -1071,8 +1202,7 @@ SpdyStream3::ConvertHeaders(nsACString &aHeadersOut)
return NS_ERROR_ILLEGAL_VALUE;
// spdy transport level headers shouldn't be gatewayed into http/1
if (!nameString.Equals(NS_LITERAL_CSTRING(":version")) &&
!nameString.Equals(NS_LITERAL_CSTRING(":status")) &&
if (!nameString.IsEmpty() && nameString[0] != ':' &&
!nameString.Equals(NS_LITERAL_CSTRING("connection")) &&
!nameString.Equals(NS_LITERAL_CSTRING("keep-alive"))) {
nsDependentCSubstring valueString =
@ -1224,7 +1354,7 @@ SpdyStream3::OnReadSegment(const char *buf,
LOG3(("ParseHttpRequestHeaders %p used %d of %d. complete = %d",
this, *countRead, count, mSynFrameComplete));
if (mSynFrameComplete) {
MOZ_ASSERT(mTxInlineFrameUsed, "OnReadSegment SynFrameComplete 0b");
AdjustInitialWindow();
rv = TransmitFrame(nullptr, nullptr, true);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
// this can't happen
@ -1309,13 +1439,22 @@ SpdyStream3::OnWriteSegment(char *buf,
uint32_t count,
uint32_t *countWritten)
{
LOG3(("SpdyStream3::OnWriteSegment %p count=%d state=%x",
this, count, mUpstreamState));
LOG3(("SpdyStream3::OnWriteSegment %p count=%d state=%x 0x%X\n",
this, count, mUpstreamState, mStreamID));
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
MOZ_ASSERT(mSegmentWriter, "OnWriteSegment with null mSegmentWriter");
MOZ_ASSERT(mSegmentWriter);
if (!mPushSource)
return mSegmentWriter->OnWriteSegment(buf, count, countWritten);
nsresult rv;
rv = mPushSource->GetBufferedData(buf, count, countWritten);
if (NS_FAILED(rv))
return rv;
mSession->ConnectPushedStream(this);
return NS_OK;
}
} // namespace mozilla::net

Просмотреть файл

@ -6,26 +6,28 @@
#ifndef mozilla_net_SpdyStream3_h
#define mozilla_net_SpdyStream3_h
#include "nsAHttpTransaction.h"
#include "mozilla/Attributes.h"
#include "nsAHttpTransaction.h"
namespace mozilla { namespace net {
class SpdyStream3 MOZ_FINAL : public nsAHttpSegmentReader
class SpdyStream3 : public nsAHttpSegmentReader
, public nsAHttpSegmentWriter
{
public:
NS_DECL_NSAHTTPSEGMENTREADER
NS_DECL_NSAHTTPSEGMENTWRITER
SpdyStream3(nsAHttpTransaction *,
SpdySession3 *, nsISocketTransport *,
uint32_t, z_stream *, int32_t);
SpdyStream3(nsAHttpTransaction *, SpdySession3 *, int32_t);
uint32_t StreamID() { return mStreamID; }
SpdyPushedStream3 *PushSource() { return mPushSource; }
nsresult ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *);
nsresult WriteSegments(nsAHttpSegmentWriter *, uint32_t, uint32_t *);
virtual nsresult ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *);
virtual nsresult WriteSegments(nsAHttpSegmentWriter *, uint32_t, uint32_t *);
virtual bool DeferCleanupOnSuccess() { return false; }
const nsAFlatCString &Origin() const { return mOrigin; }
bool RequestBlockedOnRead()
{
@ -42,9 +44,10 @@ public:
bool HasRegisteredID() { return mStreamID != 0; }
nsAHttpTransaction *Transaction()
nsAHttpTransaction *Transaction() { return mTransaction; }
virtual nsILoadGroupConnectionInfo *LoadGroupConnectionInfo()
{
return mTransaction;
return mTransaction ? mTransaction->LoadGroupConnectionInfo() : nullptr;
}
void Close(nsresult reason);
@ -81,15 +84,25 @@ public:
}
uint64_t LocalUnAcked() { return mLocalUnacked; }
int64_t LocalWindow() { return mLocalWindow; }
bool BlockedOnRwin() { return mBlockedOnRwin; }
private:
// A pull stream has an implicit sink, a pushed stream has a sink
// once it is matched to a pull stream.
virtual bool HasSink() { return true; }
// a SpdyStream3 object is only destroyed by being removed from the
// SpdySession3 mStreamTransactionHash - make the dtor private to
// just the AutoPtr implementation needed for that hash.
friend class nsAutoPtr<SpdyStream3>;
~SpdyStream3();
virtual ~SpdyStream3();
protected:
nsresult FindHeader(nsCString, nsDependentCSubstring &);
static void CreatePushHashKey(const nsCString &scheme,
const nsCString &hostHeader,
uint64_t serial,
const nsCSubstring &pathInfo,
nsCString &outOrigin,
nsCString &outKey);
enum stateType {
GENERATING_SYN_STREAM,
@ -99,12 +112,36 @@ private:
UPSTREAM_COMPLETE
};
uint32_t mStreamID;
// The session that this stream is a subset of
SpdySession3 *mSession;
nsCString mOrigin;
// Each stream goes from syn_stream to upstream_complete, perhaps
// looping on multiple instances of generating_request_body and
// sending_request_body for each SPDY chunk in the upload.
enum stateType mUpstreamState;
// Flag is set when all http request headers have been read and ID is stable
uint32_t mSynFrameComplete : 1;
// Flag is set when a FIN has been placed on a data or syn packet
// (i.e after the client has closed)
uint32_t mSentFinOnData : 1;
void ChangeState(enum stateType);
private:
friend class nsAutoPtr<SpdyStream3>;
static PLDHashOperator hdrHashEnumerate(const nsACString &,
nsAutoPtr<nsCString> &,
void *);
void ChangeState(enum stateType);
nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *);
void AdjustInitialWindow();
nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment);
void GenerateDataFrameHeader(uint32_t, bool);
@ -114,12 +151,6 @@ private:
void CompressToFrame(uint32_t);
void CompressFlushFrame();
void ExecuteCompress(uint32_t);
nsresult FindHeader(nsCString, nsDependentCSubstring &);
// Each stream goes from syn_stream to upstream_complete, perhaps
// looping on multiple instances of generating_request_body and
// sending_request_body for each SPDY chunk in the upload.
enum stateType mUpstreamState;
// The underlying HTTP transaction. This pointer is used as the key
// in the SpdySession3 mStreamTransactionHash so it is important to
@ -127,9 +158,6 @@ private:
// (i.e. don't change it or release it after it is set in the ctor).
nsRefPtr<nsAHttpTransaction> mTransaction;
// The session that this stream is a subset of
SpdySession3 *mSession;
// The underlying socket transport object is needed to propogate some events
nsISocketTransport *mSocketTransport;
@ -139,23 +167,13 @@ private:
nsAHttpSegmentReader *mSegmentReader;
nsAHttpSegmentWriter *mSegmentWriter;
// The 24 bit SPDY stream ID
uint32_t mStreamID;
// The quanta upstream data frames are chopped into
uint32_t mChunkSize;
// Flag is set when all http request headers have been read and ID is stable
uint32_t mSynFrameComplete : 1;
// Flag is set when the HTTP processor has more data to send
// but has blocked in doing so.
uint32_t mRequestBlockedOnRead : 1;
// Flag is set when a FIN has been placed on a data or syn packet
// (i.e after the client has closed)
uint32_t mSentFinOnData : 1;
// Flag is set after the response frame bearing the fin bit has
// been processed. (i.e. after the server has closed).
uint32_t mRecvdFin : 1;
@ -230,6 +248,9 @@ private:
// For Progress Events
uint64_t mTotalSent;
uint64_t mTotalRead;
// For SpdyPush
SpdyPushedStream3 *mPushSource;
};
}} // namespace mozilla::net

Просмотреть файл

@ -39,6 +39,7 @@ EXPORTS.mozilla.net += [
'HttpChannelParent.h',
'HttpInfo.h',
'PHttpChannelParams.h',
'PSpdyPush3.h',
]
CPP_SOURCES += [
@ -50,6 +51,7 @@ CPP_SOURCES += [
'HttpChannelParentListener.cpp',
'HttpInfo.cpp',
'NullHttpTransaction.cpp',
'SpdyPush3.cpp',
'SpdySession2.cpp',
'SpdySession3.cpp',
'SpdyStream2.cpp',

Просмотреть файл

@ -47,6 +47,10 @@ public:
virtual nsresult ResumeSend() = 0;
virtual nsresult ResumeRecv() = 0;
// called by a transaction to force a "read from network" iteration
// even if not scheduled by socket associated with connection
virtual nsresult ForceRecv() = 0;
// After a connection has had ResumeSend() called by a transaction,
// and it is ready to write to the network it may need to know the
// transaction that has data to write. This is only an issue for
@ -175,6 +179,12 @@ public:
return NS_ERROR_FAILURE; \
return (fwdObject)->ResumeRecv(); \
} \
nsresult ForceRecv() \
{ \
if (!(fwdObject)) \
return NS_ERROR_FAILURE; \
return (fwdObject)->ForceRecv(); \
} \
nsISocketTransport *Transport() \
{ \
if (!(fwdObject)) \

Просмотреть файл

@ -16,6 +16,8 @@ class nsIEventTarget;
class nsITransport;
class nsHttpRequestHead;
class nsHttpPipeline;
class nsHttpTransaction;
class nsILoadGroupConnectionInfo;
//----------------------------------------------------------------------------
// Abstract base class for a HTTP transaction:
@ -100,6 +102,12 @@ public:
virtual nsresult SetPipelinePosition(int32_t) = 0;
virtual int32_t PipelinePosition() = 0;
// Occasionally the abstract interface has to give way to base implementations
// to respect differences between spdy, pipelines, etc..
// These Query* (and IsNUllTransaction()) functions provide a way to do
// that without using xpcom or rtti. Any calling code that can't deal with
// a null response from one of them probably shouldn't be using nsAHttpTransaction
// If we used rtti this would be the result of doing
// dynamic_cast<nsHttpPipeline *>(this).. i.e. it can be nullptr for
// non pipeline implementations of nsAHttpTransaction
@ -110,6 +118,9 @@ public:
// its IO functions all the time.
virtual bool IsNullTransaction() { return false; }
// return the load group connection information associated with the transaction
virtual nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { return nullptr; }
// Every transaction is classified into one of the types below. When using
// HTTP pipelines, only transactions with the same type appear on the same
// pipeline.
@ -177,7 +188,7 @@ public:
// commitment now but might in the future and forceCommitment is not true .
// (forceCommitment requires a hard failure or OK at this moment.)
//
// Spdy uses this to make sure frames are atomic.
// SpdySession uses this to make sure frames are atomic.
virtual nsresult CommitToSegmentSize(uint32_t size, bool forceCommitment)
{
return NS_ERROR_FAILURE;

Просмотреть файл

@ -1096,6 +1096,35 @@ nsHttpConnection::ResumeRecv()
return NS_ERROR_UNEXPECTED;
}
class nsHttpConnectionForceRecv : public nsRunnable
{
public:
nsHttpConnectionForceRecv(nsHttpConnection *aConn)
: mConn(aConn) {}
NS_IMETHOD Run()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
if (!mConn->mSocketIn)
return NS_OK;
return mConn->OnInputStreamReady(mConn->mSocketIn);
}
private:
nsRefPtr<nsHttpConnection> mConn;
};
// trigger an asynchronous read
nsresult
nsHttpConnection::ForceRecv()
{
LOG(("nsHttpConnection::ForceRecv [this=%p]\n", this));
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return NS_DispatchToCurrentThread(new nsHttpConnectionForceRecv(this));
}
void
nsHttpConnection::BeginIdleMonitoring()
{

Просмотреть файл

@ -120,6 +120,9 @@ public:
nsresult ResumeRecv();
int64_t MaxBytesRead() {return mMaxBytesRead;}
friend class nsHttpConnectionForceRecv;
nsresult ForceRecv();
static NS_METHOD ReadFromStream(nsIInputStream *, void *, const char *,
uint32_t, uint32_t, uint32_t *);

Просмотреть файл

@ -183,8 +183,10 @@ nsHttpHandler::nsHttpHandler()
, mCoalesceSpdy(true)
, mUseAlternateProtocol(false)
, mSpdyPersistentSettings(false)
, mAllowSpdyPush(true)
, mSpdySendingChunkSize(ASpdySession::kSendingChunkSize)
, mSpdySendBufferSize(ASpdySession::kTCPSendBufferSize)
, mSpdyPushAllowance(32768)
, mSpdyPingThreshold(PR_SecondsToInterval(58))
, mSpdyPingTimeout(PR_SecondsToInterval(8))
, mConnectTimeout(90000)
@ -1159,6 +1161,22 @@ nsHttpHandler::PrefsChanged(nsIPrefBranch *prefs, const char *pref)
PR_SecondsToInterval((uint16_t) clamped(val, 0, 0x7fffffff));
}
if (PREF_CHANGED(HTTP_PREF("spdy.allow-push"))) {
rv = prefs->GetBoolPref(HTTP_PREF("spdy.allow-push"),
&cVar);
if (NS_SUCCEEDED(rv))
mAllowSpdyPush = cVar;
}
if (PREF_CHANGED(HTTP_PREF("spdy.push-allowance"))) {
rv = prefs->GetIntPref(HTTP_PREF("spdy.push-allowance"), &val);
if (NS_SUCCEEDED(rv)) {
mSpdyPushAllowance =
static_cast<uint32_t>
(clamped(val, 1024, static_cast<int32_t>(ASpdySession::kInitialRwin)));
}
}
// The amount of seconds to wait for a spdy ping response before
// closing the session.
if (PREF_CHANGED(HTTP_PREF("spdy.send-buffer-size"))) {

Просмотреть файл

@ -101,8 +101,10 @@ public:
bool UseSpdyPersistentSettings() { return mSpdyPersistentSettings; }
uint32_t SpdySendingChunkSize() { return mSpdySendingChunkSize; }
uint32_t SpdySendBufferSize() { return mSpdySendBufferSize; }
uint32_t SpdyPushAllowance() { return mSpdyPushAllowance; }
PRIntervalTime SpdyPingThreshold() { return mSpdyPingThreshold; }
PRIntervalTime SpdyPingTimeout() { return mSpdyPingTimeout; }
bool AllowSpdyPush() { return mAllowSpdyPush; }
uint32_t ConnectTimeout() { return mConnectTimeout; }
uint32_t ParallelSpeculativeConnectLimit() { return mParallelSpeculativeConnectLimit; }
bool CritialRequestPrioritization() { return mCritialRequestPrioritization; }
@ -416,8 +418,10 @@ private:
bool mCoalesceSpdy;
bool mUseAlternateProtocol;
bool mSpdyPersistentSettings;
bool mAllowSpdyPush;
uint32_t mSpdySendingChunkSize;
uint32_t mSpdySendBufferSize;
uint32_t mSpdyPushAllowance;
PRIntervalTime mSpdyPingThreshold;
PRIntervalTime mSpdyPingTimeout;

Просмотреть файл

@ -115,8 +115,9 @@ public:
const mozilla::TimeStamp GetPendingTime() { return mPendingTime; }
bool UsesPipelining() const { return mCaps & NS_HTTP_ALLOW_PIPELINING; }
void SetLoadGroupConnectionInfo(nsILoadGroupConnectionInfo *aLoadGroupCI) { mLoadGroupCI = aLoadGroupCI; }
// overload of nsAHttpTransaction::LoadGroupConnectionInfo()
nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { return mLoadGroupCI.get(); }
void SetLoadGroupConnectionInfo(nsILoadGroupConnectionInfo *aLoadGroupCI) { mLoadGroupCI = aLoadGroupCI; }
void DispatchedAsBlocking();
void RemoveDispatchedAsBlocking();