diff --git a/b2g/app/b2g.js b/b2g/app/b2g.js index cc475d6dadf2..4aad384ed504 100644 --- a/b2g/app/b2g.js +++ b/b2g/app/b2g.js @@ -50,6 +50,9 @@ pref("network.http.max-connections", 20); pref("network.http.max-persistent-connections-per-server", 6); pref("network.http.max-persistent-connections-per-proxy", 20); +// spdy +pref("network.http.spdy.push-allowance", 32768); + // See bug 545869 for details on why these are set the way they are pref("network.buffer.cache.count", 24); pref("network.buffer.cache.size", 16384); diff --git a/mobile/android/app/mobile.js b/mobile/android/app/mobile.js index 9907a48df0af..478164dccef4 100644 --- a/mobile/android/app/mobile.js +++ b/mobile/android/app/mobile.js @@ -95,6 +95,9 @@ pref("network.http.max-connections", 20); pref("network.http.max-persistent-connections-per-server", 6); pref("network.http.max-persistent-connections-per-proxy", 20); +// spdy +pref("network.http.spdy.push-allowance", 32768); + // See bug 545869 for details on why these are set the way they are pref("network.buffer.cache.count", 24); pref("network.buffer.cache.size", 16384); diff --git a/modules/libpref/src/init/all.js b/modules/libpref/src/init/all.js index bc818e2c16b9..114acdf4c489 100644 --- a/modules/libpref/src/init/all.js +++ b/modules/libpref/src/init/all.js @@ -1020,6 +1020,8 @@ pref("network.http.spdy.persistent-settings", false); pref("network.http.spdy.ping-threshold", 58); pref("network.http.spdy.ping-timeout", 8); pref("network.http.spdy.send-buffer-size", 131072); +pref("network.http.spdy.allow-push", true); +pref("network.http.spdy.push-allowance", 65536); pref("network.http.diagnostics", false); diff --git a/netwerk/base/public/nsILoadGroup.idl b/netwerk/base/public/nsILoadGroup.idl index a01e4981c726..0579d601a315 100644 --- a/netwerk/base/public/nsILoadGroup.idl +++ b/netwerk/base/public/nsILoadGroup.idl @@ -80,11 +80,18 @@ interface nsILoadGroup : nsIRequest readonly attribute nsILoadGroupConnectionInfo connectionInfo; }; +%{C++ +#include "mozilla/net/PSpdyPush3.h" +%} + +[ptr] native SpdyPushCache3Ptr(mozilla::net::SpdyPushCache3); + /** * Used to maintain state about the connections of a load group and * how they interact with blocking items like HEAD css/js loads. */ -[uuid(d1f9f18e-3d85-473a-ad58-a2367d7cdb2a)] + +[uuid(5361f30e-f968-437c-8f41-69d2756a6022)] interface nsILoadGroupConnectionInfo : nsISupports { /** @@ -104,4 +111,11 @@ interface nsILoadGroupConnectionInfo : nsISupports * blockers. */ unsigned long removeBlockingTransaction(); + + /* reading this attribute gives out weak pointers to the push + * cache. The nsILoadGroupConnectionInfo implemenation owns the cache + * and will destroy it when overwritten or when the load group + * ends. + */ + [noscript] attribute SpdyPushCache3Ptr spdyPushCache3; }; diff --git a/netwerk/base/src/nsLoadGroup.cpp b/netwerk/base/src/nsLoadGroup.cpp index f7dc4503385e..3096f4b5f53b 100644 --- a/netwerk/base/src/nsLoadGroup.cpp +++ b/netwerk/base/src/nsLoadGroup.cpp @@ -1054,6 +1054,7 @@ public: nsLoadGroupConnectionInfo(); private: int32_t mBlockingTransactionCount; // signed for PR_ATOMIC_* + nsAutoPtr mSpdyCache3; }; NS_IMPL_THREADSAFE_ISUPPORTS1(nsLoadGroupConnectionInfo, nsILoadGroupConnectionInfo) @@ -1087,6 +1088,20 @@ nsLoadGroupConnectionInfo::RemoveBlockingTransaction(uint32_t *_retval) return NS_OK; } +/* [noscript] attribute SpdyPushCache3Ptr spdyPushCache3; */ +NS_IMETHODIMP +nsLoadGroupConnectionInfo::GetSpdyPushCache3(mozilla::net::SpdyPushCache3 **aSpdyPushCache3) +{ + *aSpdyPushCache3 = mSpdyCache3.get(); + return NS_OK; +} +NS_IMETHODIMP +nsLoadGroupConnectionInfo::SetSpdyPushCache3(mozilla::net::SpdyPushCache3 *aSpdyPushCache3) +{ + mSpdyCache3 = aSpdyPushCache3; + return NS_OK; +} + nsresult nsLoadGroup::Init() { static PLDHashTableOps hash_table_ops = diff --git a/netwerk/protocol/http/ASpdySession.h b/netwerk/protocol/http/ASpdySession.h index 8753a70118bd..6fc630065eb6 100644 --- a/netwerk/protocol/http/ASpdySession.h +++ b/netwerk/protocol/http/ASpdySession.h @@ -42,6 +42,12 @@ public: const static uint32_t kSendingChunkSize = 4096; const static uint32_t kTCPSendBufferSize = 131072; + + // until we have an API that can push back on receiving data (right now + // WriteSegments is obligated to accept data and buffer) there is no + // reason to throttle with the rwin other than in server push + // scenarios. + const static uint32_t kInitialRwin = 256 * 1024 * 1024; }; // this is essentially a single instantiation as a member of nsHttpHandler. diff --git a/netwerk/protocol/http/PSpdyPush3.h b/netwerk/protocol/http/PSpdyPush3.h new file mode 100644 index 000000000000..1a914131ad60 --- /dev/null +++ b/netwerk/protocol/http/PSpdyPush3.h @@ -0,0 +1,60 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// SPDY Server Push as defined by +// http://dev.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3 + +/* + A pushed stream is put into a memory buffer (The SpdyPushTransactionBuffer) + and spooled there until a GET is made that can be matched up with it. At + that time we have two spdy streams - the GET (aka the sink) and the PUSH + (aka the source). Data is copied between those two streams for the lifetime + of the transaction. This is true even if the transaction buffer is empty, + partly complete, or totally loaded at the time the GET correspondence is made. + + correspondence is done through a hash table of the full url, the spdy session, + and the load group. The load group is implicit because that's where the + hash is stored, the other items comprise the hash key. + + Pushed streams are subject to aggressive flow control before they are matched + with a GET at which point flow control is effectively disabled to match the + client pull behavior. +*/ + +#ifndef mozilla_net_SpdyPush3_Public_h +#define mozilla_net_SpdyPush3_Public_h + +#include "nsAutoPtr.h" +#include "nsDataHashtable.h" +#include "nsISupports.h" + +class nsCString; + +namespace mozilla { +namespace net { + +class SpdyPushedStream3; + +// One Cache per load group +class SpdyPushCache3 +{ +public: + SpdyPushCache3(); + virtual ~SpdyPushCache3(); + + // The cache holds only weak pointers - no references + bool RegisterPushedStream(nsCString key, + SpdyPushedStream3 *stream); + SpdyPushedStream3 *RemovePushedStream(nsCString key); + SpdyPushedStream3 *GetPushedStream(nsCString key); + +private: + nsDataHashtable mHash; +}; + +} // namespace mozilla::net +} // namespace mozilla + +#endif // mozilla_net_SpdyPush3_Public_h diff --git a/netwerk/protocol/http/SpdyPush3.cpp b/netwerk/protocol/http/SpdyPush3.cpp new file mode 100644 index 000000000000..7f92239076b0 --- /dev/null +++ b/netwerk/protocol/http/SpdyPush3.cpp @@ -0,0 +1,395 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set sw=2 ts=8 et tw=80 : */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include + +#include "nsDependentString.h" +#include "SpdyPush3.h" + +namespace mozilla { +namespace net { + +////////////////////////////////////////// +// SpdyPushedStream3 +////////////////////////////////////////// + +SpdyPushedStream3::SpdyPushedStream3(SpdyPush3TransactionBuffer *aTransaction, + SpdySession3 *aSession, + SpdyStream3 *aAssociatedStream, + uint32_t aID) + :SpdyStream3(aTransaction, aSession, + 0 /* priority is only for sending, so ignore it on push */) + , mConsumerStream(nullptr) + , mBufferedPush(aTransaction) + , mStatus(NS_OK) + , mPushCompleted(false) + , mDeferCleanupOnSuccess(true) +{ + LOG3(("SpdyPushedStream3 ctor this=%p 0x%X\n", this, aID)); + mStreamID = aID; + mBufferedPush->SetPushStream(this); + mLoadGroupCI = aAssociatedStream->LoadGroupConnectionInfo(); + mLastRead = TimeStamp::Now(); +} + +bool +SpdyPushedStream3::GetPushComplete() +{ + return mPushCompleted; +} + +nsresult +SpdyPushedStream3::WriteSegments(nsAHttpSegmentWriter *writer, + uint32_t count, + uint32_t *countWritten) +{ + nsresult rv = SpdyStream3::WriteSegments(writer, count, countWritten); + if (NS_SUCCEEDED(rv) && *countWritten) { + mLastRead = TimeStamp::Now(); + } + + if (rv == NS_BASE_STREAM_CLOSED) { + mPushCompleted = true; + rv = NS_OK; // this is what a normal HTTP transaction would do + } + if (rv != NS_BASE_STREAM_WOULD_BLOCK && NS_FAILED(rv)) + mStatus = rv; + return rv; +} + +nsresult +SpdyPushedStream3::ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *count) +{ + // The SYN_STREAM for this has been processed, so we need to verify + // that :host, :scheme, and :path MUST be present + nsDependentCSubstring host, scheme, path; + nsresult rv; + + rv = SpdyStream3::FindHeader(NS_LITERAL_CSTRING(":host"), host); + if (NS_FAILED(rv)) { + LOG3(("SpdyPushedStream3::ReadSegments session=%p ID 0x%X " + "push without required :host\n", mSession, mStreamID)); + return rv; + } + + rv = SpdyStream3::FindHeader(NS_LITERAL_CSTRING(":scheme"), scheme); + if (NS_FAILED(rv)) { + LOG3(("SpdyPushedStream3::ReadSegments session=%p ID 0x%X " + "push without required :scheme\n", mSession, mStreamID)); + return rv; + } + + rv = SpdyStream3::FindHeader(NS_LITERAL_CSTRING(":path"), path); + if (NS_FAILED(rv)) { + LOG3(("SpdyPushedStream3::ReadSegments session=%p ID 0x%X " + "push without required :host\n", mSession, mStreamID)); + return rv; + } + + CreatePushHashKey(nsCString(scheme), nsCString(host), + mSession->Serial(), path, + mOrigin, mHashKey); + + LOG3(("SpdyPushStream3 0x%X hash key %s\n", mStreamID, mHashKey.get())); + + // the write side of a pushed transaction just involves manipulating a little state + SpdyStream3::mSentFinOnData = 1; + SpdyStream3::mSynFrameComplete = 1; + SpdyStream3::ChangeState(UPSTREAM_COMPLETE); + *count = 0; + return NS_OK; +} + +bool +SpdyPushedStream3::GetHashKey(nsCString &key) +{ + if (mHashKey.IsEmpty()) + return false; + + key = mHashKey; + return true; +} + +void +SpdyPushedStream3::ConnectPushedStream(SpdyStream3 *stream) +{ + mSession->ConnectPushedStream(stream); +} + +bool +SpdyPushedStream3::IsOrphaned(TimeStamp now) +{ + MOZ_ASSERT(!now.IsNull()); + + // if spdy is not transmitting, and is also not connected to a consumer + // stream, and its been like that for too long then it is oprhaned + + if (mConsumerStream) + return false; + + bool rv = ((now - mLastRead).ToSeconds() > 30.0); + if (rv) { + LOG3(("SpdyPushCache3::IsOrphaned 0x%X IsOrphaned %3.2f\n", + mStreamID, (now - mLastRead).ToSeconds())); + } + return rv; +} + +nsresult +SpdyPushedStream3::GetBufferedData(char *buf, + uint32_t count, + uint32_t *countWritten) +{ + if (NS_FAILED(mStatus)) + return mStatus; + + nsresult rv = mBufferedPush->GetBufferedData(buf, count, countWritten); + if (NS_FAILED(rv)) + return rv; + + if (!*countWritten) + rv = GetPushComplete() ? NS_BASE_STREAM_CLOSED : NS_BASE_STREAM_WOULD_BLOCK; + + return rv; +} + +////////////////////////////////////////// +// SpdyPushCache3 +////////////////////////////////////////// + +SpdyPushCache3::SpdyPushCache3() +{ + mHash.Init(); +} + +SpdyPushCache3::~SpdyPushCache3() +{ + mHash.Clear(); +} + +SpdyPushedStream3 * +SpdyPushCache3::GetPushedStream(nsCString key) +{ + return mHash.Get(key); +} + +bool +SpdyPushCache3::RegisterPushedStream(nsCString key, + SpdyPushedStream3 *stream) +{ + LOG3(("SpdyPushCache3::RegisterPushedStream %s 0x%X\n", + key.get(), stream->StreamID())); + if(mHash.Get(key)) + return false; + mHash.Put(key, stream); + return true; +} + +SpdyPushedStream3 * +SpdyPushCache3::RemovePushedStream(nsCString key) +{ + SpdyPushedStream3 *rv = mHash.Get(key); + LOG3(("SpdyPushCache3::RemovePushedStream %s 0x%X\n", + key.get(), rv ? rv->StreamID() : 0)); + if (rv) + mHash.Remove(key); + return rv; +} + +////////////////////////////////////////// +// SpdyPush3TransactionBuffer +// This is the nsAHttpTransction owned by the stream when the pushed +// stream has not yet been matched with a pull request +////////////////////////////////////////// + +NS_IMPL_THREADSAFE_ISUPPORTS0(SpdyPush3TransactionBuffer) + +SpdyPush3TransactionBuffer::SpdyPush3TransactionBuffer() + : mStatus(NS_OK) + , mRequestHead(nullptr) + , mPushStream(nullptr) + , mIsDone(false) + , mBufferedHTTP1Size(kDefaultBufferSize) + , mBufferedHTTP1Used(0) + , mBufferedHTTP1Consumed(0) +{ + mBufferedHTTP1 = new char[mBufferedHTTP1Size]; +} + +SpdyPush3TransactionBuffer::~SpdyPush3TransactionBuffer() +{ + delete mRequestHead; +} + +void +SpdyPush3TransactionBuffer::SetConnection(nsAHttpConnection *conn) +{ +} + +nsAHttpConnection * +SpdyPush3TransactionBuffer::Connection() +{ + return nullptr; +} + +void +SpdyPush3TransactionBuffer::GetSecurityCallbacks(nsIInterfaceRequestor **outCB) +{ + *outCB = nullptr; +} + +void +SpdyPush3TransactionBuffer::OnTransportStatus(nsITransport* transport, + nsresult status, uint64_t progress) +{ +} + +bool +SpdyPush3TransactionBuffer::IsDone() +{ + return mIsDone; +} + +nsresult +SpdyPush3TransactionBuffer::Status() +{ + return mStatus; +} + +uint32_t +SpdyPush3TransactionBuffer::Caps() +{ + return 0; +} + +uint64_t +SpdyPush3TransactionBuffer::Available() +{ + return mBufferedHTTP1Used - mBufferedHTTP1Consumed; +} + +nsresult +SpdyPush3TransactionBuffer::ReadSegments(nsAHttpSegmentReader *reader, + uint32_t count, uint32_t *countRead) +{ + *countRead = 0; + return NS_ERROR_NOT_IMPLEMENTED; +} + +nsresult +SpdyPush3TransactionBuffer::WriteSegments(nsAHttpSegmentWriter *writer, + uint32_t count, uint32_t *countWritten) +{ + if ((mBufferedHTTP1Size - mBufferedHTTP1Used) < 20480) { + SpdySession3::EnsureBuffer(mBufferedHTTP1, + mBufferedHTTP1Size + kDefaultBufferSize, + mBufferedHTTP1Used, + mBufferedHTTP1Size); + } + + count = std::min(count, mBufferedHTTP1Size - mBufferedHTTP1Used); + nsresult rv = writer->OnWriteSegment(mBufferedHTTP1 + mBufferedHTTP1Used, + count, countWritten); + if (NS_SUCCEEDED(rv)) { + mBufferedHTTP1Used += *countWritten; + } + else if (rv == NS_BASE_STREAM_CLOSED) { + mIsDone = true; + } + + if (Available()) { + SpdyStream3 *consumer = mPushStream->GetConsumerStream(); + + if (consumer) { + LOG3(("SpdyPush3TransactionBuffer::WriteSegments notifying connection " + "consumer data available 0x%X [%u]\n", + mPushStream->StreamID(), Available())); + mPushStream->ConnectPushedStream(consumer); + } + } + + return rv; +} + +uint32_t +SpdyPush3TransactionBuffer::Http1xTransactionCount() +{ + return 0; +} + +nsHttpRequestHead * +SpdyPush3TransactionBuffer::RequestHead() +{ + if (!mRequestHead) + mRequestHead = new nsHttpRequestHead(); + return mRequestHead; +} + +nsresult +SpdyPush3TransactionBuffer::TakeSubTransactions( + nsTArray > &outTransactions) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +void +SpdyPush3TransactionBuffer::SetProxyConnectFailed() +{ +} + +void +SpdyPush3TransactionBuffer::Close(nsresult reason) +{ + mStatus = reason; + mIsDone = true; +} + +nsresult +SpdyPush3TransactionBuffer::AddTransaction(nsAHttpTransaction *trans) +{ + return NS_ERROR_NOT_IMPLEMENTED; +} + +uint32_t +SpdyPush3TransactionBuffer::PipelineDepth() +{ + return 0; +} + +nsresult +SpdyPush3TransactionBuffer::SetPipelinePosition(int32_t position) +{ + return NS_OK; +} + +int32_t +SpdyPush3TransactionBuffer::PipelinePosition() +{ + return 1; +} + +nsresult +SpdyPush3TransactionBuffer::GetBufferedData(char *buf, + uint32_t count, + uint32_t *countWritten) +{ + *countWritten = std::min(count, static_cast(Available())); + if (*countWritten) { + memcpy(buf, mBufferedHTTP1 + mBufferedHTTP1Consumed, *countWritten); + mBufferedHTTP1Consumed += *countWritten; + } + + // If all the data has been consumed then reset the buffer + if (mBufferedHTTP1Consumed == mBufferedHTTP1Used) { + mBufferedHTTP1Consumed = 0; + mBufferedHTTP1Used = 0; + } + + return NS_OK; +} + +} // namespace mozilla::net +} // namespace mozilla diff --git a/netwerk/protocol/http/SpdyPush3.h b/netwerk/protocol/http/SpdyPush3.h new file mode 100644 index 000000000000..b3fa0dbd2b8d --- /dev/null +++ b/netwerk/protocol/http/SpdyPush3.h @@ -0,0 +1,102 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +// SPDY Server Push as defined by +// http://dev.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3 + +#ifndef mozilla_net_SpdyPush3_Internal_h +#define mozilla_net_SpdyPush3_Internal_h + +#include "mozilla/Attributes.h" +#include "mozilla/TimeStamp.h" +#include "nsHttpRequestHead.h" +#include "nsILoadGroup.h" +#include "nsString.h" +#include "PSpdyPush3.h" +#include "SpdySession3.h" +#include "SpdyStream3.h" + +namespace mozilla { +namespace net { + +class SpdyPush3TransactionBuffer; + +class SpdyPushedStream3 MOZ_FINAL : public SpdyStream3 +{ +public: + SpdyPushedStream3(SpdyPush3TransactionBuffer *aTransaction, + SpdySession3 *aSession, + SpdyStream3 *aAssociatedStream, + uint32_t aID); + virtual ~SpdyPushedStream3() {} + + bool GetPushComplete(); + SpdyStream3 *GetConsumerStream() { return mConsumerStream; }; + void SetConsumerStream(SpdyStream3 *aStream) { mConsumerStream = aStream; } + bool GetHashKey(nsCString &key); + + // override of SpdyStream3 + nsresult ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *); + nsresult WriteSegments(nsAHttpSegmentWriter *, uint32_t, uint32_t *); + + nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { return mLoadGroupCI; }; + void ConnectPushedStream(SpdyStream3 *consumer); + + bool DeferCleanupOnSuccess() { return mDeferCleanupOnSuccess; } + void SetDeferCleanupOnSuccess(bool val) { mDeferCleanupOnSuccess = val; } + + bool IsOrphaned(TimeStamp now); + + nsresult GetBufferedData(char *buf, uint32_t count, uint32_t *countWritten); + + // overload of SpdyStream3 + virtual bool HasSink() { return !!mConsumerStream; } + +private: + + SpdyStream3 *mConsumerStream; // paired request stream that consumes from + // real spdy one.. null until a match is made. + + nsCOMPtr mLoadGroupCI; + + SpdyPush3TransactionBuffer *mBufferedPush; + mozilla::TimeStamp mLastRead; + + nsCString mHashKey; + nsresult mStatus; + bool mPushCompleted; // server push FIN received + bool mDeferCleanupOnSuccess; +}; + +class SpdyPush3TransactionBuffer MOZ_FINAL : public nsAHttpTransaction +{ +public: + NS_DECL_ISUPPORTS + NS_DECL_NSAHTTPTRANSACTION + + SpdyPush3TransactionBuffer(); + virtual ~SpdyPush3TransactionBuffer(); + + nsresult GetBufferedData(char *buf, uint32_t count, uint32_t *countWritten); + void SetPushStream(SpdyPushedStream3 *stream) { mPushStream = stream; } + +private: + const static uint32_t kDefaultBufferSize = 4096; + + nsresult mStatus; + nsHttpRequestHead *mRequestHead; + SpdyPushedStream3 *mPushStream; + bool mIsDone; + + nsAutoArrayPtr mBufferedHTTP1; + uint32_t mBufferedHTTP1Size; + uint32_t mBufferedHTTP1Used; + uint32_t mBufferedHTTP1Consumed; +}; + +} // namespace mozilla::net +} // namespace mozilla + +#endif // mozilla_net_SpdyPush3_Internal_h diff --git a/netwerk/protocol/http/SpdySession3.cpp b/netwerk/protocol/http/SpdySession3.cpp index ba223eaec156..40c0bc32fd03 100644 --- a/netwerk/protocol/http/SpdySession3.cpp +++ b/netwerk/protocol/http/SpdySession3.cpp @@ -4,15 +4,18 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -#include "nsHttp.h" -#include "SpdySession3.h" -#include "SpdyStream3.h" -#include "nsHttpConnection.h" -#include "nsHttpHandler.h" -#include "prnetdb.h" #include "mozilla/Telemetry.h" #include "mozilla/Preferences.h" +#include "nsHttp.h" +#include "nsHttpHandler.h" +#include "nsHttpConnection.h" +#include "nsILoadGroup.h" #include "prprf.h" +#include "prnetdb.h" +#include "SpdyPush3.h" +#include "SpdySession3.h" +#include "SpdyStream3.h" + #include #ifdef DEBUG @@ -38,7 +41,6 @@ SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction, : mSocketTransport(aSocketTransport), mSegmentReader(nullptr), mSegmentWriter(nullptr), - mSendingChunkSize(ASpdySession::kSendingChunkSize), mNextStreamID(1), mConcurrentHighWater(0), mDownstreamState(BUFFERING_FRAME_HEADER), @@ -65,8 +67,11 @@ SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction, { MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - LOG3(("SpdySession3::SpdySession3 %p transaction 1 = %p", - this, aHttpTransaction)); + static uint64_t sSerial; + mSerial = ++sSerial; + + LOG3(("SpdySession3::SpdySession3 %p transaction 1 = %p serial=0x%X\n", + this, aHttpTransaction, mSerial)); mStreamIDHash.Init(); mStreamTransactionHash.Init(); @@ -75,6 +80,7 @@ SpdySession3::SpdySession3(nsAHttpTransaction *aHttpTransaction, mOutputQueueBuffer = new char[mOutputQueueSize]; zlibInit(); + mPushAllowance = gHttpHandler->SpdyPushAllowance(); mSendingChunkSize = gHttpHandler->SpdySendingChunkSize(); GenerateSettings(); @@ -116,9 +122,12 @@ SpdySession3::GoAwayEnumerator(nsAHttpTransaction *key, // these streams were not processed by the server and can be restarted. // Do that after the enumerator completes to avoid the risk of - // a restart event re-entrantly modifying this hash. - if (stream->StreamID() > self->mGoAwayID || !stream->HasRegisteredID()) + // a restart event re-entrantly modifying this hash. Be sure not to restart + // a pushed (even numbered) stream + if ((stream->StreamID() > self->mGoAwayID && (stream->StreamID() & 1)) || + !stream->HasRegisteredID()) { self->mGoAwayStreamsToRestart.Push(stream); + } return PL_DHASH_NEXT; } @@ -220,12 +229,12 @@ SpdySession3::ReadTimeoutTick(PRIntervalTime now) MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(mNextPingID & 1, "Ping Counter Not Odd"); - if (!mPingThreshold) - return; - LOG(("SpdySession3::ReadTimeoutTick %p delta since last read %ds\n", this, PR_IntervalToSeconds(now - mLastReadEpoch))); + if (!mPingThreshold) + return; + if ((now - mLastReadEpoch) < mPingThreshold) { // recent activity means ping is not an issue if (mPingSentEpoch) @@ -260,6 +269,35 @@ SpdySession3::ReadTimeoutTick(PRIntervalTime now) mNextPingID += 2; ResumeRecv(); // read the ping reply + // Check for orphaned push streams. This looks expensive, but generally the + // list is empty. + SpdyPushedStream3 *deleteMe; + TimeStamp timestampNow; + do { + deleteMe = nullptr; + + for (uint32_t index = mPushedStreams.Length(); + index > 0 ; --index) { + SpdyPushedStream3 *pushedStream = mPushedStreams[index - 1]; + + if (timestampNow.IsNull()) + timestampNow = TimeStamp::Now(); // lazy initializer + + // if spdy finished, but not connected, and its been like that for too long.. + // cleanup the stream.. + if (pushedStream->IsOrphaned(timestampNow)) + { + LOG3(("SpdySession3 Timeout Pushed Stream %p 0x%X\n", + this, pushedStream->StreamID())); + deleteMe = pushedStream; + break; // don't CleanupStream() while iterating this vector + } + } + if (deleteMe) + CleanupStream(deleteMe, NS_ERROR_ABORT, RST_CANCEL); + + } while (deleteMe); + if (mNextPingID == 0xffffffff) { LOG(("SpdySession3::ReadTimeoutTick %p " "ping ids exhausted marking goaway\n", this)); @@ -268,35 +306,41 @@ SpdySession3::ReadTimeoutTick(PRIntervalTime now) } uint32_t -SpdySession3::RegisterStreamID(SpdyStream3 *stream) +SpdySession3::RegisterStreamID(SpdyStream3 *stream, uint32_t aNewID) { MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - LOG3(("SpdySession3::RegisterStreamID session=%p stream=%p id=0x%X " - "concurrent=%d",this, stream, mNextStreamID, mConcurrent)); - MOZ_ASSERT(mNextStreamID < 0xfffffff0, "should have stopped admitting streams"); - uint32_t result = mNextStreamID; - mNextStreamID += 2; + MOZ_ASSERT(!(aNewID & 1), + "0 for autoassign pull, otherwise explicit even push assignment"); + if (!aNewID) { + // auto generate a new pull stream ID + aNewID = mNextStreamID; + MOZ_ASSERT(aNewID & 1, "pull ID must be odd."); + mNextStreamID += 2; + } + + LOG3(("SpdySession3::RegisterStreamID session=%p stream=%p id=0x%X " + "concurrent=%d",this, stream, aNewID, mConcurrent)); // We've used up plenty of ID's on this session. Start // moving to a new one before there is a crunch involving // server push streams or concurrent non-registered submits - if (mNextStreamID >= kMaxStreamID) + if (aNewID >= kMaxStreamID) mShouldGoAway = true; // integrity check - if (mStreamIDHash.Get(result)) { + if (mStreamIDHash.Get(aNewID)) { LOG3((" New ID already present\n")); MOZ_ASSERT(false, "New ID already present in mStreamIDHash"); mShouldGoAway = true; return kDeadStreamID; } - mStreamIDHash.Put(result, stream); - return result; + mStreamIDHash.Put(aNewID, stream); + return aNewID; } bool @@ -313,12 +357,7 @@ SpdySession3::AddStream(nsAHttpTransaction *aHttpTransaction, } aHttpTransaction->SetConnection(this); - SpdyStream3 *stream = new SpdyStream3(aHttpTransaction, - this, - mSocketTransport, - mSendingChunkSize, - &mUpstreamZlib, - aPriority); + SpdyStream3 *stream = new SpdyStream3(aHttpTransaction, this, aPriority); LOG3(("SpdySession3::AddStream session=%p stream=%p NextID=0x%X (tentative)", this, stream, mNextStreamID)); @@ -331,8 +370,7 @@ SpdySession3::AddStream(nsAHttpTransaction *aHttpTransaction, ActivateStream(stream); } else { - LOG3(("SpdySession3::AddStream %p stream %p queued.", - this, stream)); + LOG3(("SpdySession3::AddStream %p stream %p queued.", this, stream)); mQueuedStreams.Push(stream); } @@ -343,8 +381,10 @@ void SpdySession3::ActivateStream(SpdyStream3 *stream) { MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); + MOZ_ASSERT(!stream->StreamID() || (stream->StreamID() & 1), + "Do not activate pushed streams"); - mConcurrent++; + ++mConcurrent; if (mConcurrent > mConcurrentHighWater) mConcurrentHighWater = mConcurrent; LOG3(("SpdySession3::AddStream %p activating stream %p Currently %d " @@ -486,9 +526,9 @@ SpdySession3::ResetDownstreamState() if (mInputFrameDataLast && mInputFrameDataStream) { mInputFrameDataLast = false; if (!mInputFrameDataStream->RecvdFin()) { + LOG3((" SetRecvdFin id=0x%x\n", mInputFrameDataStream->StreamID())); mInputFrameDataStream->SetRecvdFin(true); - --mConcurrent; - ProcessPending(); + DecrementConcurrent(mInputFrameDataStream); } } mInputFrameBufferUsed = 0; @@ -529,6 +569,21 @@ SpdySession3::EnsureBuffer(nsAutoArrayPtr &buf, uint32_t preserve, uint32_t &objSize); +void +SpdySession3::DecrementConcurrent(SpdyStream3 *aStream) +{ + uint32_t id = aStream->StreamID(); + + if (id && !(id & 0x1)) + return; // pushed streams aren't counted in concurrent limit + + MOZ_ASSERT(mConcurrent); + --mConcurrent; + LOG3(("DecrementConcurrent %p id=0x%X concurrent=%d\n", + this, id, mConcurrent)); + ProcessPending(); +} + void SpdySession3::zlibInit() { @@ -692,11 +747,13 @@ SpdySession3::GenerateSettings() // 2nd entry is bytes 20 to 27 // 3rd entry is bytes 28 to 35 + if (!gHttpHandler->AllowSpdyPush()) { // announcing that we accept 0 incoming streams is done to - // disable server push until that is implemented. - packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_MAX_CONCURRENT; - // The value portion of the setting pair is already initialized to 0 - numberOfEntries++; + // disable server push + packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_MAX_CONCURRENT; + // The value portion of the setting pair is already initialized to 0 + numberOfEntries++; + } nsRefPtr ci; uint32_t cwnd = 0; @@ -712,8 +769,11 @@ SpdySession3::GenerateSettings() numberOfEntries++; } + // Advertise the Push RWIN and on each client SYN_STREAM pipeline + // a window update with it in order to use larger initial windows with pulled + // streams. packet[15 + 8 * numberOfEntries] = SETTINGS_TYPE_INITIAL_WINDOW; - uint32_t rwin = PR_htonl(kInitialRwin); + uint32_t rwin = PR_htonl(mPushAllowance); memcpy(packet + 16 + 8 * numberOfEntries, &rwin, 4); numberOfEntries++; @@ -780,6 +840,7 @@ SpdySession3::VerifyStream(SpdyStream3 *aStream, uint32_t aOptionalID = 0) "optionalID=0x%X trans=%p test=%d\n", this, aStream, aStream->StreamID(), aOptionalID, aStream->Transaction(), test)); + MOZ_ASSERT(false, "VerifyStream"); return false; } @@ -792,24 +853,39 @@ SpdySession3::CleanupStream(SpdyStream3 *aStream, nsresult aResult, LOG3(("SpdySession3::CleanupStream %p %p 0x%X %X\n", this, aStream, aStream->StreamID(), aResult)); + SpdyPushedStream3 *pushSource = nullptr; + + if (NS_SUCCEEDED(aResult) && aStream->DeferCleanupOnSuccess()) { + LOG(("SpdySession3::CleanupStream 0x%X deferred\n", aStream->StreamID())); + return; + } + if (!VerifyStream(aStream)) { LOG(("SpdySession3::CleanupStream failed to verify stream\n")); return; } + pushSource = aStream->PushSource(); + if (!aStream->RecvdFin() && aStream->StreamID()) { LOG3(("Stream had not processed recv FIN, sending RST code %X\n", aResetCode)); GenerateRstStream(aResetCode, aStream->StreamID()); - --mConcurrent; - ProcessPending(); + DecrementConcurrent(aStream); } CloseStream(aStream, aResult); - // Remove the stream from the ID hash table. (this one isn't short, which is - // why it is hashed.) - mStreamIDHash.Remove(aStream->StreamID()); + // Remove the stream from the ID hash table and, if an even id, the pushed + // table too. + uint32_t id = aStream->StreamID(); + if (id > 0) { + mStreamIDHash.Remove(id); + if (!(id & 1)) + mPushedStreams.RemoveElement(aStream); + } + + RemoveStreamFromQueues(aStream); // removing from the stream transaction hash will // delete the SpdyStream3 and drop the reference to @@ -818,6 +894,29 @@ SpdySession3::CleanupStream(SpdyStream3 *aStream, nsresult aResult, if (mShouldGoAway && !mStreamTransactionHash.Count()) Close(NS_OK); + + if (pushSource) { + pushSource->SetDeferCleanupOnSuccess(false); + CleanupStream(pushSource, aResult, aResetCode); + } +} + +static void RemoveStreamFromQueue(SpdyStream3 *aStream, nsDeque &queue) +{ + uint32_t size = queue.GetSize(); + for (uint32_t count = 0; count < size; ++count) { + SpdyStream3 *stream = static_cast(queue.PopFront()); + if (stream != aStream) + queue.Push(stream); + } +} + +void +SpdySession3::RemoveStreamFromQueues(SpdyStream3 *aStream) +{ + RemoveStreamFromQueue(aStream, mReadyForWrite); + RemoveStreamFromQueue(aStream, mQueuedStreams); + RemoveStreamFromQueue(aStream, mReadyForRead); } void @@ -834,23 +933,7 @@ SpdySession3::CloseStream(SpdyStream3 *aStream, nsresult aResult) mInputFrameDataStream = nullptr; } - // check the streams blocked on write, this is linear but the list - // should be pretty short. - uint32_t size = mReadyForWrite.GetSize(); - for (uint32_t count = 0; count < size; ++count) { - SpdyStream3 *stream = static_cast(mReadyForWrite.PopFront()); - if (stream != aStream) - mReadyForWrite.Push(stream); - } - - // Check the streams queued for activation. Because we normally accept a high - // level of parallelization this should also be short. - size = mQueuedStreams.GetSize(); - for (uint32_t count = 0; count < size; ++count) { - SpdyStream3 *stream = static_cast(mQueuedStreams.PopFront()); - if (stream != aStream) - mQueuedStreams.Push(stream); - } + RemoveStreamFromQueues(aStream); // Send the stream the close() indication aStream->Close(aResult); @@ -871,9 +954,10 @@ SpdySession3::HandleSynStream(SpdySession3 *self) PR_ntohl(reinterpret_cast(self->mInputFrameBuffer.get())[2]); uint32_t associatedID = PR_ntohl(reinterpret_cast(self->mInputFrameBuffer.get())[3]); + uint8_t flags = reinterpret_cast(self->mInputFrameBuffer.get())[4]; LOG3(("SpdySession3::HandleSynStream %p recv SYN_STREAM (push) " - "for ID 0x%X associated with 0x%X.", + "for ID 0x%X associated with 0x%X.\n", self, streamID, associatedID)); if (streamID & 0x01) { // test for odd stream ID @@ -882,6 +966,12 @@ SpdySession3::HandleSynStream(SpdySession3 *self) return NS_ERROR_ILLEGAL_VALUE; } + // confirm associated-to + nsresult rv = self->SetInputFrameDataStream(associatedID); + if (NS_FAILED(rv)) + return rv; + SpdyStream3 *associatedStream = self->mInputFrameDataStream; + ++(self->mServerPushedResources); // Anytime we start using the high bit of stream ID (either client or server) @@ -889,17 +979,127 @@ SpdySession3::HandleSynStream(SpdySession3 *self) if (streamID >= kMaxStreamID) self->mShouldGoAway = true; - // Need to decompress the headers even though we aren't using them yet in - // order to keep the compression context consistent for other syn_reply frames - nsresult rv = - self->UncompressAndDiscard(18, self->mInputFrameDataSize - 10); + bool resetStream = true; + SpdyPushCache3 *cache = nullptr; + + if (!(flags & kFlag_Data_UNI)) { + // pushed streams require UNIDIRECTIONAL flag + LOG3(("SpdySession3::HandleSynStream %p ID %0x%X associated ID 0x%X failed.\n", + self, streamID, associatedID)); + self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID); + + } else if (!associatedID) { + // associated stream 0 will never find a match, but the spec requires a + // PROTOCOL_ERROR in this specific case + LOG3(("SpdySession3::HandleSynStream %p associated ID of 0 failed.\n", self)); + self->GenerateRstStream(RST_PROTOCOL_ERROR, streamID); + + } else if (!gHttpHandler->AllowSpdyPush()) { + // MAX_CONCURRENT_STREAMS of 0 in settings should have disabled push, + // but some servers are buggy about that.. or the config could have + // been updated after the settings frame was sent. In both cases just + // reject the pushed stream as refused + LOG3(("SpdySession3::HandleSynStream Push Recevied when Disabled\n")); + self->GenerateRstStream(RST_REFUSED_STREAM, streamID); + + } else if (!associatedStream) { + LOG3(("SpdySession3::HandleSynStream %p lookup associated ID failed.\n", self)); + self->GenerateRstStream(RST_INVALID_STREAM, streamID); + + } else { + nsILoadGroupConnectionInfo *loadGroupCI = associatedStream->LoadGroupConnectionInfo(); + if (loadGroupCI) { + loadGroupCI->GetSpdyPushCache3(&cache); + if (!cache) { + cache = new SpdyPushCache3(); + if (!cache || NS_FAILED(loadGroupCI->SetSpdyPushCache3(cache))) { + delete cache; + cache = nullptr; + } + } + } + if (!cache) { + // this is unexpected, but we can handle it just be refusing the push + LOG3(("SpdySession3::HandleSynStream Push Recevied without loadgroup cache\n")); + self->GenerateRstStream(RST_REFUSED_STREAM, streamID); + } + else { + resetStream = false; + } + } + + if (resetStream) { + // Need to decompress the headers even though we aren't using them yet in + // order to keep the compression context consistent for other syn_reply frames + rv = self->UncompressAndDiscard(18, self->mInputFrameDataSize - 10); + if (NS_FAILED(rv)) { + LOG(("SpdySession3::HandleSynStream uncompress failed\n")); + return rv; + } + self->ResetDownstreamState(); + return NS_OK; + } + + // Create the buffering transaction and push stream + nsRefPtr transactionBuffer = + new SpdyPush3TransactionBuffer(); + transactionBuffer->SetConnection(self); + SpdyPushedStream3 *pushedStream = + new SpdyPushedStream3(transactionBuffer, self, + associatedStream, streamID); + + // ownership of the pushed stream is by the transaction hash, just as it + // is for a client initiated stream. Errors that aren't fatal to the + // whole session must call cleanupStream() after this point in order + // to remove the stream from that hash. + self->mStreamTransactionHash.Put(transactionBuffer, pushedStream); + self->mPushedStreams.AppendElement(pushedStream); + + // The pushed stream is unidirectional so it is fully open immediately + pushedStream->SetFullyOpen(); + + // Uncompress the response headers into a stream specific buffer, leaving them + // in spdy format for the time being. + rv = pushedStream->Uncompress(&self->mDownstreamZlib, + self->mInputFrameBuffer + 18, + self->mInputFrameDataSize - 10); if (NS_FAILED(rv)) { LOG(("SpdySession3::HandleSynStream uncompress failed\n")); return rv; } - // todo populate cache. For now, just reject server push p3 - self->GenerateRstStream(RST_REFUSED_STREAM, streamID); + if (self->RegisterStreamID(pushedStream, streamID) == kDeadStreamID) { + LOG(("SpdySession3::HandleSynStream registerstreamid failed\n")); + return NS_ERROR_FAILURE; + } + + // Fake the request side of the pushed HTTP transaction. Sets up hash + // key and origin + uint32_t notUsed; + pushedStream->ReadSegments(nullptr, 1, ¬Used); + + nsAutoCString key; + if (!pushedStream->GetHashKey(key)) { + LOG(("SpdySession3::HandleSynStream one of :host :scheme :path missing from push\n")); + self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM); + self->ResetDownstreamState(); + return NS_OK; + } + + if (!associatedStream->Origin().Equals(pushedStream->Origin())) { + LOG(("SpdySession3::HandleSynStream pushed stream mismatched origin\n")); + self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM); + self->ResetDownstreamState(); + return NS_OK; + } + + if (!cache->RegisterPushedStream(key, pushedStream)) { + LOG(("SpdySession3::HandleSynStream registerPushedStream Failed\n")); + self->CleanupStream(pushedStream, NS_ERROR_FAILURE, RST_INVALID_STREAM); + self->ResetDownstreamState(); + return NS_OK; + } + self->ResetDownstreamState(); return NS_OK; } @@ -1267,8 +1467,8 @@ SpdySession3::HandleGoAway(SpdySession3 *self) self->mCleanShutdown = true; // Find streams greater than the last-good ID and mark them for deletion - // in the mGoAwayStreamsToRestart queue with the GoAwayEnumerator. They can - // be restarted. + // in the mGoAwayStreamsToRestart queue with the GoAwayEnumerator. The + // underlying transaction can be restarted. self->mStreamTransactionHash.Enumerate(GoAwayEnumerator, self); // Process the streams marked for deletion and restart. @@ -1330,7 +1530,7 @@ SpdySession3::HandleHeaders(SpdySession3 *self) if (NS_FAILED(self->UncompressAndDiscard(12, self->mInputFrameDataSize - 4))) { - LOG(("SpdySession3::HandleSynReply uncompress failed\n")); + LOG(("SpdySession3::HandleHeaders uncompress failed\n")); // this is fatal to the session return NS_ERROR_FAILURE; } @@ -1466,8 +1666,9 @@ SpdySession3::OnTransportStatus(nsITransport* aTransport, case NS_NET_STATUS_CONNECTED_TO: { SpdyStream3 *target = mStreamIDHash.Get(1); - if (target) - target->Transaction()->OnTransportStatus(aTransport, aStatus, aProgress); + nsAHttpTransaction *transaction = target ? target->Transaction() : nullptr; + if (transaction) + transaction->OnTransportStatus(aTransport, aStatus, aProgress); break; } @@ -1626,6 +1827,42 @@ SpdySession3::WriteSegments(nsAHttpSegmentWriter *writer, SetWriteCallbacks(); + // If there are http transactions attached to a push stream with filled buffers + // trigger that data pump here. This only reads from buffers (not the network) + // so mDownstreamState doesn't matter. + SpdyStream3 *pushConnectedStream = + static_cast(mReadyForRead.PopFront()); + if (pushConnectedStream) { + LOG3(("SpdySession3::WriteSegments %p processing pushed stream 0x%X\n", + this, pushConnectedStream->StreamID())); + mSegmentWriter = writer; + rv = pushConnectedStream->WriteSegments(this, count, countWritten); + mSegmentWriter = nullptr; + + // The pipe in nsHttpTransaction rewrites CLOSED error codes into OK + // so we need this check to determine the truth. + if (NS_SUCCEEDED(rv) && !*countWritten && + pushConnectedStream->PushSource() && + pushConnectedStream->PushSource()->GetPushComplete()) { + rv = NS_BASE_STREAM_CLOSED; + } + + if (rv == NS_BASE_STREAM_CLOSED) { + CleanupStream(pushConnectedStream, NS_OK, RST_CANCEL); + rv = NS_OK; + } + + // if we return OK to nsHttpConnection it will use mSocketInCondition + // to determine whether to schedule more reads, incorrectly + // assuming that nsHttpConnection::OnSocketWrite() was called. + if (NS_SUCCEEDED(rv) || rv == NS_BASE_STREAM_WOULD_BLOCK) { + rv = NS_BASE_STREAM_WOULD_BLOCK; + ResumeRecv(); + } + + return rv; + } + // We buffer all control frames and act on them in this layer. // We buffer the first 8 bytes of data frames (the header) but // the actual data is passed through unprocessed. @@ -1860,9 +2097,9 @@ SpdySession3::WriteSegments(nsAHttpSegmentWriter *writer, return rv; } + MOZ_ASSERT(mDownstreamState == BUFFERING_CONTROL_FRAME); if (mDownstreamState != BUFFERING_CONTROL_FRAME) { // this cannot happen - MOZ_ASSERT(false, "Not in Bufering Control Frame State"); return NS_ERROR_UNEXPECTED; } @@ -1918,27 +2155,34 @@ SpdySession3::UpdateLocalRwin(SpdyStream3 *stream, if (!stream || stream->RecvdFin()) return; - LOG3(("SpdySession3::UpdateLocalRwin %p 0x%X %d\n", - this, stream->StreamID(), bytes)); stream->DecrementLocalWindow(bytes); // Don't necessarily ack every data packet. Only do it // after a significant amount of data. uint64_t unacked = stream->LocalUnAcked(); + int64_t localWindow = stream->LocalWindow(); - if (unacked < kMinimumToAck) { - // Sanity check to make sure this won't let the window drop below 1MB - PR_STATIC_ASSERT(kMinimumToAck < kInitialRwin); - PR_STATIC_ASSERT((kInitialRwin - kMinimumToAck) > 1024 * 1024); + LOG3(("SpdySession3::UpdateLocalRwin this=%p id=0x%X newbytes=%u " + "unacked=%llu localWindow=%lld\n", + this, stream->StreamID(), bytes, unacked, localWindow)); + if (!unacked) + return; + + if ((unacked < kMinimumToAck) && (localWindow > kEmergencyWindowThreshold)) + return; + + if (!stream->HasSink()) { + LOG3(("SpdySession3::UpdateLocalRwin %p 0x%X Pushed Stream Has No Sink\n", + this, stream->StreamID())); return; } // Generate window updates directly out of spdysession instead of the stream - // in order to avoid queue delays in getting the ACK out. + // in order to avoid queue delays in getting the 'ACK' out. uint32_t toack = (unacked <= 0x7fffffffU) ? unacked : 0x7fffffffU; - LOG3(("SpdySession3::UpdateLocalRwin Ack %p 0x%X %d\n", + LOG3(("SpdySession3::UpdateLocalRwin Ack this=%p id=0x%X acksize=%d\n", this, stream->StreamID(), toack)); stream->IncrementLocalWindow(toack); @@ -2205,6 +2449,13 @@ SpdySession3::SetNeedsCleanup() ResetDownstreamState(); } +void +SpdySession3::ConnectPushedStream(SpdyStream3 *stream) +{ + mReadyForRead.Push(stream); + ForceRecv(); +} + //----------------------------------------------------------------------------- // Modified methods of nsAHttpConnection //----------------------------------------------------------------------------- diff --git a/netwerk/protocol/http/SpdySession3.h b/netwerk/protocol/http/SpdySession3.h index 4f2d5ecb0a22..7e215c0ddf07 100644 --- a/netwerk/protocol/http/SpdySession3.h +++ b/netwerk/protocol/http/SpdySession3.h @@ -10,14 +10,13 @@ // http://dev.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3 #include "ASpdySession.h" +#include "mozilla/Attributes.h" #include "nsClassHashtable.h" #include "nsDataHashtable.h" #include "nsDeque.h" #include "nsHashKeys.h" #include "zlib.h" -#include "mozilla/Attributes.h" -class nsHttpConnection; class nsISocketTransport; namespace mozilla { namespace net { @@ -49,7 +48,8 @@ public: // Idle time represents time since "goodput".. e.g. a data or header frame PRIntervalTime IdleTime(); - uint32_t RegisterStreamID(SpdyStream3 *); + // Registering with a newID of 0 means pick the next available odd ID + uint32_t RegisterStreamID(SpdyStream3 *, uint32_t aNewID = 0); const static uint8_t kVersion = 3; @@ -134,11 +134,9 @@ public: // 31 bit stream ID. const static uint32_t kDeadStreamID = 0xffffdead; - // until we have an API that can push back on receiving data (right now - // WriteSegments is obligated to accept data and buffer) there is no - // reason to throttle with the rwin other than in server push - // scenarios. - const static uint32_t kInitialRwin = 256 * 1024 * 1024; + // below the emergency threshold of local window we ack every received + // byte. Above that we coalesce bytes into the MinimumToAck size. + const static int32_t kEmergencyWindowThreshold = 1024 * 1024; const static uint32_t kMinimumToAck = 64 * 1024; // The default peer rwin is 64KB unless updated by a settings frame @@ -174,8 +172,18 @@ public: uint32_t GetServerInitialWindow() { return mServerInitialWindow; } + void ConnectPushedStream(SpdyStream3 *stream); + + uint64_t Serial() { return mSerial; } + void PrintDiagnostics (nsCString &log); + // Streams need access to these + uint32_t SendingChunkSize() { return mSendingChunkSize; } + uint32_t PushAllowance() { return mPushAllowance; } + z_stream *UpstreamZlib() { return &mUpstreamZlib; } + nsISocketTransport *SocketTransport() { return mSocketTransport; } + private: enum stateType { @@ -192,6 +200,7 @@ private: void ChangeDownstreamState(enum stateType); void ResetDownstreamState(); nsresult UncompressAndDiscard(uint32_t, uint32_t); + void DecrementConcurrent(SpdyStream3 *); void zlibInit(); void GeneratePing(uint32_t); void GenerateRstStream(uint32_t, uint32_t); @@ -199,6 +208,7 @@ private: void CleanupStream(SpdyStream3 *, nsresult, rstReason); void CloseStream(SpdyStream3 *, nsresult); void GenerateSettings(); + void RemoveStreamFromQueues(SpdyStream3 *); void SetWriteCallbacks(); void FlushOutputQueue(); @@ -229,7 +239,7 @@ private: nsAutoPtr &, void *); - // This is intended to be nsHttpConnectionMgr:nsHttpConnectionHandle taken + // This is intended to be nsHttpConnectionMgr:nsConnectionHandle taken // from the first transaction on this session. That object contains the // pointer to the real network-level nsHttpConnection object. nsRefPtr mConnection; @@ -246,20 +256,25 @@ private: uint32_t mSendingChunkSize; /* the transmission chunk size */ uint32_t mNextStreamID; /* 24 bits */ uint32_t mConcurrentHighWater; /* max parallelism on session */ + uint32_t mPushAllowance; /* rwin for unmatched pushes */ stateType mDownstreamState; /* in frame, between frames, etc.. */ - // Maintain 4 indexes - one by stream ID, one by transaction ptr, - // one list of streams ready to write, one list of streams that are queued - // due to max parallelism settings. The objects - // are not ref counted - they get destroyed + // Maintain 2 indexes - one by stream ID, one by transaction pointer. + // There are also several lists of streams: ready to write, queued due to + // max parallelism, streams that need to force a read for push, and the full + // set of pushed streams. + // The objects are not ref counted - they get destroyed // by the nsClassHashtable implementation when they are removed from // the transaction hash. nsDataHashtable mStreamIDHash; nsClassHashtable, SpdyStream3> mStreamTransactionHash; + nsDeque mReadyForWrite; nsDeque mQueuedStreams; + nsDeque mReadyForRead; + nsTArray mPushedStreams; // Compression contexts for header transport using deflate. // SPDY compresses only HTTP headers and does not reset zlib in between @@ -361,6 +376,11 @@ private: // used as a temporary buffer while enumerating the stream hash during GoAway nsDeque mGoAwayStreamsToRestart; + + // Each session gets a unique serial number because the push cache is correlated + // by the load group and the serial number can be used as part of the cache key + // to make sure streams aren't shared across sessions. + uint64_t mSerial; }; }} // namespace mozilla::net diff --git a/netwerk/protocol/http/SpdyStream3.cpp b/netwerk/protocol/http/SpdyStream3.cpp index a0ac4319297c..c55413d0b1f2 100644 --- a/netwerk/protocol/http/SpdyStream3.cpp +++ b/netwerk/protocol/http/SpdyStream3.cpp @@ -4,16 +4,18 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -#include "nsHttp.h" -#include "SpdySession3.h" -#include "SpdyStream3.h" -#include "nsAlgorithm.h" -#include "prnetdb.h" -#include "nsHttpRequestHead.h" #include "mozilla/Telemetry.h" +#include "nsAlgorithm.h" +#include "nsHttp.h" +#include "nsHttpHandler.h" +#include "nsHttpRequestHead.h" #include "nsISocketTransport.h" #include "nsISupportsPriority.h" -#include "nsHttpHandler.h" +#include "prnetdb.h" +#include "SpdyPush3.h" +#include "SpdySession3.h" +#include "SpdyStream3.h" + #include #ifdef DEBUG @@ -26,21 +28,18 @@ namespace net { SpdyStream3::SpdyStream3(nsAHttpTransaction *httpTransaction, SpdySession3 *spdySession, - nsISocketTransport *socketTransport, - uint32_t chunkSize, - z_stream *compressionContext, int32_t priority) - : mUpstreamState(GENERATING_SYN_STREAM), - mTransaction(httpTransaction), + : mStreamID(0), mSession(spdySession), - mSocketTransport(socketTransport), + mUpstreamState(GENERATING_SYN_STREAM), + mSynFrameComplete(0), + mSentFinOnData(0), + mTransaction(httpTransaction), + mSocketTransport(spdySession->SocketTransport()), mSegmentReader(nullptr), mSegmentWriter(nullptr), - mStreamID(0), - mChunkSize(chunkSize), - mSynFrameComplete(0), + mChunkSize(spdySession->SendingChunkSize()), mRequestBlockedOnRead(0), - mSentFinOnData(0), mRecvdFin(0), mFullyOpen(0), mSentWaitingFor(0), @@ -49,23 +48,25 @@ SpdyStream3::SpdyStream3(nsAHttpTransaction *httpTransaction, mTxInlineFrameSize(SpdySession3::kDefaultBufferSize), mTxInlineFrameUsed(0), mTxStreamFrameSize(0), - mZlib(compressionContext), + mZlib(spdySession->UpstreamZlib()), mDecompressBufferSize(SpdySession3::kDefaultBufferSize), mDecompressBufferUsed(0), mDecompressedBytes(0), mRequestBodyLenRemaining(0), mPriority(priority), - mLocalWindow(SpdySession3::kInitialRwin), mLocalUnacked(0), mBlockedOnRwin(false), mTotalSent(0), - mTotalRead(0) + mTotalRead(0), + mPushSource(nullptr) { MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); LOG3(("SpdyStream3::SpdyStream3 %p", this)); mRemoteWindow = spdySession->GetServerInitialWindow(); + mLocalWindow = spdySession->PushAllowance(); + mTxInlineFrame = new uint8_t[mTxInlineFrameSize]; mDecompressBuffer = new char[mDecompressBufferSize]; } @@ -186,15 +187,16 @@ SpdyStream3::WriteSegments(nsAHttpSegmentWriter *writer, uint32_t count, uint32_t *countWritten) { - LOG3(("SpdyStream3::WriteSegments %p count=%d state=%x", - this, count, mUpstreamState)); - MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); MOZ_ASSERT(!mSegmentWriter, "segment writer in progress"); + LOG3(("SpdyStream3::WriteSegments %p count=%d state=%x", + this, count, mUpstreamState)); + mSegmentWriter = writer; - nsresult rv = mTransaction->WriteSegments(writer, count, countWritten); + nsresult rv = mTransaction->WriteSegments(this, count, countWritten); mSegmentWriter = nullptr; + return rv; } @@ -210,6 +212,26 @@ SpdyStream3::hdrHashEnumerate(const nsACString &key, return PL_DHASH_NEXT; } +void +SpdyStream3::CreatePushHashKey(const nsCString &scheme, + const nsCString &hostHeader, + uint64_t serial, + const nsCSubstring &pathInfo, + nsCString &outOrigin, + nsCString &outKey) +{ + outOrigin = scheme; + outOrigin.Append(NS_LITERAL_CSTRING("://")); + outOrigin.Append(hostHeader); + + outKey = outOrigin; + outKey.Append(NS_LITERAL_CSTRING("/[spdy3.")); + outKey.AppendInt(serial); + outKey.Append(NS_LITERAL_CSTRING("]")); + outKey.Append(pathInfo); +} + + nsresult SpdyStream3::ParseHttpRequestHeaders(const char *buf, uint32_t avail, @@ -247,6 +269,44 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf, *countUsed = avail - (oldLen - endHeader) + 4; mSynFrameComplete = 1; + nsCString hostHeader; + nsCString hashkey; + mTransaction->RequestHead()->GetHeader(nsHttp::Host, hostHeader); + + CreatePushHashKey(NS_LITERAL_CSTRING("https"), + hostHeader, mSession->Serial(), + mTransaction->RequestHead()->RequestURI(), + mOrigin, hashkey); + + // check the push cache for GET + if (mTransaction->RequestHead()->Method() == nsHttp::Get) { + // from :scheme, :host, :path + nsILoadGroupConnectionInfo *loadGroupCI = mTransaction->LoadGroupConnectionInfo(); + SpdyPushCache3 *cache = nullptr; + if (loadGroupCI) + loadGroupCI->GetSpdyPushCache3(&cache); + + SpdyPushedStream3 *pushedStream = nullptr; + // we remove the pushedstream from the push cache so that + // it will not be used for another GET. This does not destroy the + // stream itself - that is done when the transactionhash is done with it. + if (cache) + pushedStream = cache->RemovePushedStream(hashkey); + + if (pushedStream) { + LOG3(("Pushed Stream Match located id=0x%X key=%s\n", + pushedStream->StreamID(), hashkey.get())); + pushedStream->SetConsumerStream(this); + mPushSource = pushedStream; + mSentFinOnData = 1; + + // There is probably pushed data buffered so trigger a read manually + // as we can't rely on future network events to do it + mSession->ConnectPushedStream(this); + return NS_OK; + } + } + // It is now OK to assign a streamID that we are assured will // be monotonically increasing amongst syn-streams on this // session @@ -309,11 +369,6 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf, // The client cert "slot". Right now we don't send client certs mTxInlineFrame[17] = 0; - const char *methodHeader = mTransaction->RequestHead()->Method().get(); - - nsCString hostHeader; - mTransaction->RequestHead()->GetHeader(nsHttp::Host, hostHeader); - nsCString versionHeader; if (mTransaction->RequestHead()->Version() == NS_HTTP_VERSION_1_1) versionHeader = NS_LITERAL_CSTRING("HTTP/1.1"); @@ -388,6 +443,8 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf, // contain auth. The http transaction already logs the sanitized request // headers at this same level so it is not necessary to do so here. + const char *methodHeader = mTransaction->RequestHead()->Method().get(); + // The header block length uint16_t count = hdrHash.Count() + 5; /* method, path, version, host, scheme */ CompressToFrame(count); @@ -452,6 +509,75 @@ SpdyStream3::ParseHttpRequestHeaders(const char *buf, return NS_OK; } +void +SpdyStream3::AdjustInitialWindow() +{ + MOZ_ASSERT(mSession->PushAllowance() <= ASpdySession::kInitialRwin); + + // The session initial_window is sized for serverpushed streams. When we + // generate a client pulled stream we want to adjust the initial window + // to a huge value in a pipeline with that SYN_STREAM. + + // >0 even numbered IDs are pushed streams. + // odd numbered IDs are pulled streams. + // 0 is the sink for a pushed stream. + SpdyStream3 *stream = this; + if (!mStreamID) { + MOZ_ASSERT(mPushSource); + if (!mPushSource) + return; + stream = mPushSource; + MOZ_ASSERT(stream->mStreamID); + MOZ_ASSERT(!(stream->mStreamID & 1)); // is a push stream + + // If the pushed stream has sent a FIN, there is no reason to update + // the window + if (stream->RecvdFin()) + return; + } + + // For server pushes we also want to include in the ack any data that has been + // buffered but unacknowledged. + + // mLocalUnacked is technically 64 bits, but because it can never grow larger than + // our window size (which is closer to 29bits max) we know it fits comfortably in 32. + // However we don't really enforce that, and track it as a 64 so that broken senders + // can still interoperate. That means we have to be careful with this calculation. + uint64_t toack64 = (ASpdySession::kInitialRwin - mSession->PushAllowance()) + + stream->mLocalUnacked; + stream->mLocalUnacked = 0; + if (toack64 > 0x7fffffff) { + stream->mLocalUnacked = toack64 - 0x7fffffff; + toack64 = 0x7fffffff; + } + uint32_t toack = static_cast(toack64); + if (!toack) + return; + toack = PR_htonl(toack); + + SpdySession3::EnsureBuffer(mTxInlineFrame, + mTxInlineFrameUsed + 16, + mTxInlineFrameUsed, + mTxInlineFrameSize); + + unsigned char *packet = mTxInlineFrame.get() + mTxInlineFrameUsed; + mTxInlineFrameUsed += 16; + + memset(packet, 0, 8); + packet[0] = SpdySession3::kFlag_Control; + packet[1] = SpdySession3::kVersion; + packet[3] = SpdySession3::CONTROL_TYPE_WINDOW_UPDATE; + packet[7] = 8; // 8 data bytes after 8 byte header + + uint32_t id = PR_htonl(stream->mStreamID); + memcpy(packet + 8, &id, 4); + memcpy(packet + 12, &toack, 4); + + stream->mLocalWindow += PR_ntohl(toack); + LOG3(("AdjustInitialwindow %p 0x%X %u\n", + this, stream->mStreamID, PR_ntohl(toack))); +} + void SpdyStream3::UpdateTransportReadEvents(uint32_t count) { @@ -510,6 +636,16 @@ SpdyStream3::TransmitFrame(const char *buf, // flush internal buffers that were previously blocked on writing. You can // of course feed new data to it as well. + LOG3(("SpdyStream3::TransmitFrame %p inline=%d stream=%d", + this, mTxInlineFrameUsed, mTxStreamFrameSize)); + if (countUsed) + *countUsed = 0; + + if (!mTxInlineFrameUsed) { + MOZ_ASSERT(!buf); + return NS_OK; + } + MOZ_ASSERT(mTxInlineFrameUsed, "empty stream frame in transmit"); MOZ_ASSERT(mSegmentReader, "TransmitFrame with null mSegmentReader"); MOZ_ASSERT((buf && countUsed) || (!buf && !countUsed), @@ -518,11 +654,6 @@ SpdyStream3::TransmitFrame(const char *buf, uint32_t transmittedCount; nsresult rv; - LOG3(("SpdyStream3::TransmitFrame %p inline=%d stream=%d", - this, mTxInlineFrameUsed, mTxStreamFrameSize)); - if (countUsed) - *countUsed = 0; - // In the (relatively common) event that we have a small amount of data // split between the inlineframe and the streamframe, then move the stream // data into the inlineframe via copy in order to coalesce into one write. @@ -1071,8 +1202,7 @@ SpdyStream3::ConvertHeaders(nsACString &aHeadersOut) return NS_ERROR_ILLEGAL_VALUE; // spdy transport level headers shouldn't be gatewayed into http/1 - if (!nameString.Equals(NS_LITERAL_CSTRING(":version")) && - !nameString.Equals(NS_LITERAL_CSTRING(":status")) && + if (!nameString.IsEmpty() && nameString[0] != ':' && !nameString.Equals(NS_LITERAL_CSTRING("connection")) && !nameString.Equals(NS_LITERAL_CSTRING("keep-alive"))) { nsDependentCSubstring valueString = @@ -1224,7 +1354,7 @@ SpdyStream3::OnReadSegment(const char *buf, LOG3(("ParseHttpRequestHeaders %p used %d of %d. complete = %d", this, *countRead, count, mSynFrameComplete)); if (mSynFrameComplete) { - MOZ_ASSERT(mTxInlineFrameUsed, "OnReadSegment SynFrameComplete 0b"); + AdjustInitialWindow(); rv = TransmitFrame(nullptr, nullptr, true); if (rv == NS_BASE_STREAM_WOULD_BLOCK) { // this can't happen @@ -1306,16 +1436,25 @@ SpdyStream3::OnReadSegment(const char *buf, nsresult SpdyStream3::OnWriteSegment(char *buf, - uint32_t count, - uint32_t *countWritten) + uint32_t count, + uint32_t *countWritten) { - LOG3(("SpdyStream3::OnWriteSegment %p count=%d state=%x", - this, count, mUpstreamState)); + LOG3(("SpdyStream3::OnWriteSegment %p count=%d state=%x 0x%X\n", + this, count, mUpstreamState, mStreamID)); MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); - MOZ_ASSERT(mSegmentWriter, "OnWriteSegment with null mSegmentWriter"); + MOZ_ASSERT(mSegmentWriter); - return mSegmentWriter->OnWriteSegment(buf, count, countWritten); + if (!mPushSource) + return mSegmentWriter->OnWriteSegment(buf, count, countWritten); + + nsresult rv; + rv = mPushSource->GetBufferedData(buf, count, countWritten); + if (NS_FAILED(rv)) + return rv; + + mSession->ConnectPushedStream(this); + return NS_OK; } } // namespace mozilla::net diff --git a/netwerk/protocol/http/SpdyStream3.h b/netwerk/protocol/http/SpdyStream3.h index 2b6c563251e1..38c2baa46c28 100644 --- a/netwerk/protocol/http/SpdyStream3.h +++ b/netwerk/protocol/http/SpdyStream3.h @@ -6,26 +6,28 @@ #ifndef mozilla_net_SpdyStream3_h #define mozilla_net_SpdyStream3_h -#include "nsAHttpTransaction.h" #include "mozilla/Attributes.h" +#include "nsAHttpTransaction.h" namespace mozilla { namespace net { -class SpdyStream3 MOZ_FINAL : public nsAHttpSegmentReader - , public nsAHttpSegmentWriter +class SpdyStream3 : public nsAHttpSegmentReader + , public nsAHttpSegmentWriter { public: NS_DECL_NSAHTTPSEGMENTREADER NS_DECL_NSAHTTPSEGMENTWRITER - SpdyStream3(nsAHttpTransaction *, - SpdySession3 *, nsISocketTransport *, - uint32_t, z_stream *, int32_t); + SpdyStream3(nsAHttpTransaction *, SpdySession3 *, int32_t); uint32_t StreamID() { return mStreamID; } + SpdyPushedStream3 *PushSource() { return mPushSource; } - nsresult ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *); - nsresult WriteSegments(nsAHttpSegmentWriter *, uint32_t, uint32_t *); + virtual nsresult ReadSegments(nsAHttpSegmentReader *, uint32_t, uint32_t *); + virtual nsresult WriteSegments(nsAHttpSegmentWriter *, uint32_t, uint32_t *); + virtual bool DeferCleanupOnSuccess() { return false; } + + const nsAFlatCString &Origin() const { return mOrigin; } bool RequestBlockedOnRead() { @@ -42,9 +44,10 @@ public: bool HasRegisteredID() { return mStreamID != 0; } - nsAHttpTransaction *Transaction() + nsAHttpTransaction *Transaction() { return mTransaction; } + virtual nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { - return mTransaction; + return mTransaction ? mTransaction->LoadGroupConnectionInfo() : nullptr; } void Close(nsresult reason); @@ -81,15 +84,25 @@ public: } uint64_t LocalUnAcked() { return mLocalUnacked; } + int64_t LocalWindow() { return mLocalWindow; } + bool BlockedOnRwin() { return mBlockedOnRwin; } -private: + // A pull stream has an implicit sink, a pushed stream has a sink + // once it is matched to a pull stream. + virtual bool HasSink() { return true; } - // a SpdyStream3 object is only destroyed by being removed from the - // SpdySession3 mStreamTransactionHash - make the dtor private to - // just the AutoPtr implementation needed for that hash. - friend class nsAutoPtr; - ~SpdyStream3(); + virtual ~SpdyStream3(); + +protected: + nsresult FindHeader(nsCString, nsDependentCSubstring &); + + static void CreatePushHashKey(const nsCString &scheme, + const nsCString &hostHeader, + uint64_t serial, + const nsCSubstring &pathInfo, + nsCString &outOrigin, + nsCString &outKey); enum stateType { GENERATING_SYN_STREAM, @@ -99,12 +112,36 @@ private: UPSTREAM_COMPLETE }; + uint32_t mStreamID; + + // The session that this stream is a subset of + SpdySession3 *mSession; + + nsCString mOrigin; + + // Each stream goes from syn_stream to upstream_complete, perhaps + // looping on multiple instances of generating_request_body and + // sending_request_body for each SPDY chunk in the upload. + enum stateType mUpstreamState; + + // Flag is set when all http request headers have been read and ID is stable + uint32_t mSynFrameComplete : 1; + + // Flag is set when a FIN has been placed on a data or syn packet + // (i.e after the client has closed) + uint32_t mSentFinOnData : 1; + + void ChangeState(enum stateType); + +private: + friend class nsAutoPtr; + static PLDHashOperator hdrHashEnumerate(const nsACString &, nsAutoPtr &, void *); - void ChangeState(enum stateType); nsresult ParseHttpRequestHeaders(const char *, uint32_t, uint32_t *); + void AdjustInitialWindow(); nsresult TransmitFrame(const char *, uint32_t *, bool forceCommitment); void GenerateDataFrameHeader(uint32_t, bool); @@ -114,12 +151,6 @@ private: void CompressToFrame(uint32_t); void CompressFlushFrame(); void ExecuteCompress(uint32_t); - nsresult FindHeader(nsCString, nsDependentCSubstring &); - - // Each stream goes from syn_stream to upstream_complete, perhaps - // looping on multiple instances of generating_request_body and - // sending_request_body for each SPDY chunk in the upload. - enum stateType mUpstreamState; // The underlying HTTP transaction. This pointer is used as the key // in the SpdySession3 mStreamTransactionHash so it is important to @@ -127,9 +158,6 @@ private: // (i.e. don't change it or release it after it is set in the ctor). nsRefPtr mTransaction; - // The session that this stream is a subset of - SpdySession3 *mSession; - // The underlying socket transport object is needed to propogate some events nsISocketTransport *mSocketTransport; @@ -139,23 +167,13 @@ private: nsAHttpSegmentReader *mSegmentReader; nsAHttpSegmentWriter *mSegmentWriter; - // The 24 bit SPDY stream ID - uint32_t mStreamID; - // The quanta upstream data frames are chopped into uint32_t mChunkSize; - // Flag is set when all http request headers have been read and ID is stable - uint32_t mSynFrameComplete : 1; - // Flag is set when the HTTP processor has more data to send // but has blocked in doing so. uint32_t mRequestBlockedOnRead : 1; - // Flag is set when a FIN has been placed on a data or syn packet - // (i.e after the client has closed) - uint32_t mSentFinOnData : 1; - // Flag is set after the response frame bearing the fin bit has // been processed. (i.e. after the server has closed). uint32_t mRecvdFin : 1; @@ -230,6 +248,9 @@ private: // For Progress Events uint64_t mTotalSent; uint64_t mTotalRead; + + // For SpdyPush + SpdyPushedStream3 *mPushSource; }; }} // namespace mozilla::net diff --git a/netwerk/protocol/http/moz.build b/netwerk/protocol/http/moz.build index ae3e5d12bbfc..848a8ad31937 100644 --- a/netwerk/protocol/http/moz.build +++ b/netwerk/protocol/http/moz.build @@ -39,6 +39,7 @@ EXPORTS.mozilla.net += [ 'HttpChannelParent.h', 'HttpInfo.h', 'PHttpChannelParams.h', + 'PSpdyPush3.h', ] CPP_SOURCES += [ @@ -50,6 +51,7 @@ CPP_SOURCES += [ 'HttpChannelParentListener.cpp', 'HttpInfo.cpp', 'NullHttpTransaction.cpp', + 'SpdyPush3.cpp', 'SpdySession2.cpp', 'SpdySession3.cpp', 'SpdyStream2.cpp', diff --git a/netwerk/protocol/http/nsAHttpConnection.h b/netwerk/protocol/http/nsAHttpConnection.h index a45bf322700a..647bdde6dd1a 100644 --- a/netwerk/protocol/http/nsAHttpConnection.h +++ b/netwerk/protocol/http/nsAHttpConnection.h @@ -47,6 +47,10 @@ public: virtual nsresult ResumeSend() = 0; virtual nsresult ResumeRecv() = 0; + // called by a transaction to force a "read from network" iteration + // even if not scheduled by socket associated with connection + virtual nsresult ForceRecv() = 0; + // After a connection has had ResumeSend() called by a transaction, // and it is ready to write to the network it may need to know the // transaction that has data to write. This is only an issue for @@ -175,6 +179,12 @@ public: return NS_ERROR_FAILURE; \ return (fwdObject)->ResumeRecv(); \ } \ + nsresult ForceRecv() \ + { \ + if (!(fwdObject)) \ + return NS_ERROR_FAILURE; \ + return (fwdObject)->ForceRecv(); \ + } \ nsISocketTransport *Transport() \ { \ if (!(fwdObject)) \ diff --git a/netwerk/protocol/http/nsAHttpTransaction.h b/netwerk/protocol/http/nsAHttpTransaction.h index f93d87bb2860..7b62de0c5fb8 100644 --- a/netwerk/protocol/http/nsAHttpTransaction.h +++ b/netwerk/protocol/http/nsAHttpTransaction.h @@ -16,6 +16,8 @@ class nsIEventTarget; class nsITransport; class nsHttpRequestHead; class nsHttpPipeline; +class nsHttpTransaction; +class nsILoadGroupConnectionInfo; //---------------------------------------------------------------------------- // Abstract base class for a HTTP transaction: @@ -100,6 +102,12 @@ public: virtual nsresult SetPipelinePosition(int32_t) = 0; virtual int32_t PipelinePosition() = 0; + // Occasionally the abstract interface has to give way to base implementations + // to respect differences between spdy, pipelines, etc.. + // These Query* (and IsNUllTransaction()) functions provide a way to do + // that without using xpcom or rtti. Any calling code that can't deal with + // a null response from one of them probably shouldn't be using nsAHttpTransaction + // If we used rtti this would be the result of doing // dynamic_cast(this).. i.e. it can be nullptr for // non pipeline implementations of nsAHttpTransaction @@ -110,6 +118,9 @@ public: // its IO functions all the time. virtual bool IsNullTransaction() { return false; } + // return the load group connection information associated with the transaction + virtual nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { return nullptr; } + // Every transaction is classified into one of the types below. When using // HTTP pipelines, only transactions with the same type appear on the same // pipeline. @@ -177,7 +188,7 @@ public: // commitment now but might in the future and forceCommitment is not true . // (forceCommitment requires a hard failure or OK at this moment.) // - // Spdy uses this to make sure frames are atomic. + // SpdySession uses this to make sure frames are atomic. virtual nsresult CommitToSegmentSize(uint32_t size, bool forceCommitment) { return NS_ERROR_FAILURE; diff --git a/netwerk/protocol/http/nsHttpConnection.cpp b/netwerk/protocol/http/nsHttpConnection.cpp index 7e5f07c8df22..1aa77276c63c 100644 --- a/netwerk/protocol/http/nsHttpConnection.cpp +++ b/netwerk/protocol/http/nsHttpConnection.cpp @@ -1096,6 +1096,35 @@ nsHttpConnection::ResumeRecv() return NS_ERROR_UNEXPECTED; } + +class nsHttpConnectionForceRecv : public nsRunnable +{ +public: + nsHttpConnectionForceRecv(nsHttpConnection *aConn) + : mConn(aConn) {} + + NS_IMETHOD Run() + { + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); + + if (!mConn->mSocketIn) + return NS_OK; + return mConn->OnInputStreamReady(mConn->mSocketIn); + } +private: + nsRefPtr mConn; +}; + +// trigger an asynchronous read +nsresult +nsHttpConnection::ForceRecv() +{ + LOG(("nsHttpConnection::ForceRecv [this=%p]\n", this)); + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); + + return NS_DispatchToCurrentThread(new nsHttpConnectionForceRecv(this)); +} + void nsHttpConnection::BeginIdleMonitoring() { diff --git a/netwerk/protocol/http/nsHttpConnection.h b/netwerk/protocol/http/nsHttpConnection.h index b63058653f47..89177c82c419 100644 --- a/netwerk/protocol/http/nsHttpConnection.h +++ b/netwerk/protocol/http/nsHttpConnection.h @@ -120,6 +120,9 @@ public: nsresult ResumeRecv(); int64_t MaxBytesRead() {return mMaxBytesRead;} + friend class nsHttpConnectionForceRecv; + nsresult ForceRecv(); + static NS_METHOD ReadFromStream(nsIInputStream *, void *, const char *, uint32_t, uint32_t, uint32_t *); diff --git a/netwerk/protocol/http/nsHttpHandler.cpp b/netwerk/protocol/http/nsHttpHandler.cpp index 5178264c1025..05633992dd69 100644 --- a/netwerk/protocol/http/nsHttpHandler.cpp +++ b/netwerk/protocol/http/nsHttpHandler.cpp @@ -183,8 +183,10 @@ nsHttpHandler::nsHttpHandler() , mCoalesceSpdy(true) , mUseAlternateProtocol(false) , mSpdyPersistentSettings(false) + , mAllowSpdyPush(true) , mSpdySendingChunkSize(ASpdySession::kSendingChunkSize) , mSpdySendBufferSize(ASpdySession::kTCPSendBufferSize) + , mSpdyPushAllowance(32768) , mSpdyPingThreshold(PR_SecondsToInterval(58)) , mSpdyPingTimeout(PR_SecondsToInterval(8)) , mConnectTimeout(90000) @@ -1159,6 +1161,22 @@ nsHttpHandler::PrefsChanged(nsIPrefBranch *prefs, const char *pref) PR_SecondsToInterval((uint16_t) clamped(val, 0, 0x7fffffff)); } + if (PREF_CHANGED(HTTP_PREF("spdy.allow-push"))) { + rv = prefs->GetBoolPref(HTTP_PREF("spdy.allow-push"), + &cVar); + if (NS_SUCCEEDED(rv)) + mAllowSpdyPush = cVar; + } + + if (PREF_CHANGED(HTTP_PREF("spdy.push-allowance"))) { + rv = prefs->GetIntPref(HTTP_PREF("spdy.push-allowance"), &val); + if (NS_SUCCEEDED(rv)) { + mSpdyPushAllowance = + static_cast + (clamped(val, 1024, static_cast(ASpdySession::kInitialRwin))); + } + } + // The amount of seconds to wait for a spdy ping response before // closing the session. if (PREF_CHANGED(HTTP_PREF("spdy.send-buffer-size"))) { diff --git a/netwerk/protocol/http/nsHttpHandler.h b/netwerk/protocol/http/nsHttpHandler.h index be68164866d5..7dcb75489871 100644 --- a/netwerk/protocol/http/nsHttpHandler.h +++ b/netwerk/protocol/http/nsHttpHandler.h @@ -101,8 +101,10 @@ public: bool UseSpdyPersistentSettings() { return mSpdyPersistentSettings; } uint32_t SpdySendingChunkSize() { return mSpdySendingChunkSize; } uint32_t SpdySendBufferSize() { return mSpdySendBufferSize; } + uint32_t SpdyPushAllowance() { return mSpdyPushAllowance; } PRIntervalTime SpdyPingThreshold() { return mSpdyPingThreshold; } PRIntervalTime SpdyPingTimeout() { return mSpdyPingTimeout; } + bool AllowSpdyPush() { return mAllowSpdyPush; } uint32_t ConnectTimeout() { return mConnectTimeout; } uint32_t ParallelSpeculativeConnectLimit() { return mParallelSpeculativeConnectLimit; } bool CritialRequestPrioritization() { return mCritialRequestPrioritization; } @@ -416,8 +418,10 @@ private: bool mCoalesceSpdy; bool mUseAlternateProtocol; bool mSpdyPersistentSettings; + bool mAllowSpdyPush; uint32_t mSpdySendingChunkSize; uint32_t mSpdySendBufferSize; + uint32_t mSpdyPushAllowance; PRIntervalTime mSpdyPingThreshold; PRIntervalTime mSpdyPingTimeout; diff --git a/netwerk/protocol/http/nsHttpTransaction.h b/netwerk/protocol/http/nsHttpTransaction.h index 6cc994291321..655eaf877582 100644 --- a/netwerk/protocol/http/nsHttpTransaction.h +++ b/netwerk/protocol/http/nsHttpTransaction.h @@ -115,8 +115,9 @@ public: const mozilla::TimeStamp GetPendingTime() { return mPendingTime; } bool UsesPipelining() const { return mCaps & NS_HTTP_ALLOW_PIPELINING; } - void SetLoadGroupConnectionInfo(nsILoadGroupConnectionInfo *aLoadGroupCI) { mLoadGroupCI = aLoadGroupCI; } + // overload of nsAHttpTransaction::LoadGroupConnectionInfo() nsILoadGroupConnectionInfo *LoadGroupConnectionInfo() { return mLoadGroupCI.get(); } + void SetLoadGroupConnectionInfo(nsILoadGroupConnectionInfo *aLoadGroupCI) { mLoadGroupCI = aLoadGroupCI; } void DispatchedAsBlocking(); void RemoveDispatchedAsBlocking();