bug 819734 - Token Bucket for Network Bursts part 1/2 [base] r=honzab

This commit is contained in:
Patrick McManus 2013-04-15 08:50:35 -04:00
Родитель fe05965115
Коммит 2793bc2a0e
11 изменённых файлов: 689 добавлений и 8 удалений

Просмотреть файл

@ -1003,6 +1003,11 @@ pref("network.http.spdy.send-buffer-size", 131072);
pref("network.http.diagnostics", false);
pref("network.http.pacing.requests.enabled", false);
pref("network.http.pacing.requests.min-parallelism", 6);
pref("network.http.pacing.requests.hz", 100);
pref("network.http.pacing.requests.burst", 32);
// default values for FTP
// in a DSCP environment this should be 40 (0x28, or AF11), per RFC-4594,
// Section 4.8 "High-Throughput Data Service Class", and 80 (0x50, or AF22)

Просмотреть файл

@ -0,0 +1,313 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "EventTokenBucket.h"
#include "nsNetUtil.h"
#include "nsSocketTransportService2.h"
extern PRThread *gSocketThread;
namespace mozilla {
namespace net {
////////////////////////////////////////////
// EventTokenBucketCancelable
////////////////////////////////////////////
class TokenBucketCancelable : public nsICancelable
{
public:
NS_DECL_ISUPPORTS
NS_DECL_NSICANCELABLE
TokenBucketCancelable(class ATokenBucketEvent *event);
virtual ~TokenBucketCancelable() {}
void Fire();
private:
friend class EventTokenBucket;
ATokenBucketEvent *mEvent;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(TokenBucketCancelable, nsICancelable)
TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent *event)
: mEvent(event)
{
}
NS_IMETHODIMP
TokenBucketCancelable::Cancel(nsresult reason)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
mEvent = nullptr;
return NS_OK;
}
void
TokenBucketCancelable::Fire()
{
if (!mEvent)
return;
ATokenBucketEvent *event = mEvent;
mEvent = nullptr;
event->OnTokenBucketAdmitted();
}
////////////////////////////////////////////
// EventTokenBucket
////////////////////////////////////////////
NS_IMPL_THREADSAFE_ISUPPORTS1(EventTokenBucket, nsITimerCallback)
// by default 1hz with no burst
EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond,
uint32_t burstSize)
: mUnitCost(kUsecPerSec)
, mMaxCredit(kUsecPerSec)
, mCredit(kUsecPerSec)
, mPaused(false)
, mStopped(false)
, mTimerArmed(false)
{
MOZ_COUNT_CTOR(EventTokenBucket);
mLastUpdate = TimeStamp::Now();
MOZ_ASSERT(NS_IsMainThread());
nsresult rv;
nsCOMPtr<nsIEventTarget> sts;
nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
if (NS_SUCCEEDED(rv))
sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_SUCCEEDED(rv))
mTimer = do_CreateInstance("@mozilla.org/timer;1");
if (mTimer)
mTimer->SetTarget(sts);
SetRate(eventsPerSecond, burstSize);
}
EventTokenBucket::~EventTokenBucket()
{
SOCKET_LOG(("EventTokenBucket::dtor %p events=%d\n",
this, mEvents.GetSize()));
MOZ_COUNT_DTOR(EventTokenBucket);
if (mTimer && mTimerArmed)
mTimer->Cancel();
// Complete any queued events to prevent hangs
while (mEvents.GetSize()) {
nsRefPtr<TokenBucketCancelable> cancelable =
dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
cancelable->Fire();
}
}
void
EventTokenBucket::SetRate(uint32_t eventsPerSecond,
uint32_t burstSize)
{
SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n",
this, eventsPerSecond, burstSize));
if (eventsPerSecond > kMaxHz) {
eventsPerSecond = kMaxHz;
SOCKET_LOG((" eventsPerSecond out of range\n"));
}
if (!eventsPerSecond) {
eventsPerSecond = 1;
SOCKET_LOG((" eventsPerSecond out of range\n"));
}
mUnitCost = kUsecPerSec / eventsPerSecond;
mMaxCredit = mUnitCost * burstSize;
if (mMaxCredit > kUsecPerSec * 60 * 15) {
SOCKET_LOG((" burstSize out of range\n"));
mMaxCredit = kUsecPerSec * 60 * 15;
}
mCredit = mMaxCredit;
mLastUpdate = TimeStamp::Now();
}
void
EventTokenBucket::ClearCredits()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
mCredit = 0;
}
uint32_t
EventTokenBucket::BurstEventsAvailable()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return static_cast<uint32_t>(mCredit / mUnitCost);
}
uint32_t
EventTokenBucket::QueuedEvents()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
return mEvents.GetSize();
}
void
EventTokenBucket::Pause()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
if (mPaused || mStopped)
return;
mPaused = true;
if (mTimerArmed) {
mTimer->Cancel();
mTimerArmed = false;
}
}
void
EventTokenBucket::UnPause()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
if (!mPaused || mStopped)
return;
mPaused = false;
DispatchEvents();
UpdateTimer();
}
nsresult
EventTokenBucket::SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
if (mStopped || !mTimer)
return NS_ERROR_FAILURE;
UpdateCredits();
nsRefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event);
// When this function exits the cancelEvent needs 2 references, one for the
// mEvents queue and one for the caller of SubmitEvent()
NS_ADDREF(*cancelable = cancelEvent.get());
if (mPaused || !TryImmediateDispatch(cancelEvent.get())) {
// queue it
SOCKET_LOG((" queued\n"));
mEvents.Push(cancelEvent.forget().get());
UpdateTimer();
}
else {
SOCKET_LOG((" dispatched synchronously\n"));
}
return NS_OK;
}
bool
EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable *cancelable)
{
if (mCredit < mUnitCost)
return false;
mCredit -= mUnitCost;
cancelable->Fire();
return true;
}
void
EventTokenBucket::DispatchEvents()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused));
if (mPaused || mStopped)
return;
while (mEvents.GetSize() && mUnitCost <= mCredit) {
nsRefPtr<TokenBucketCancelable> cancelable =
dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
if (cancelable->mEvent) {
SOCKET_LOG(("EventTokenBucket::DispachEvents [%p] "
"Dispatching queue token bucket event cost=%lu credit=%lu\n",
this, mUnitCost, mCredit));
mCredit -= mUnitCost;
cancelable->Fire();
}
}
}
void
EventTokenBucket::UpdateTimer()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer)
return;
if (mCredit >= mUnitCost)
return;
// determine the time needed to wait to accumulate enough credits to admit
// one more event and set the timer for that point. Always round it
// up because firing early doesn't help.
//
uint64_t deficit = mUnitCost - mCredit;
uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec;
if (msecWait < 4) // minimum wait
msecWait = 4;
else if (msecWait > 60000) // maximum wait
msecWait = 60000;
SOCKET_LOG(("EventTokenBucket::UpdateTimer %p for %dms\n",
this, msecWait));
nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait),
nsITimer::TYPE_ONE_SHOT);
mTimerArmed = NS_SUCCEEDED(rv);
}
NS_IMETHODIMP
EventTokenBucket::Notify(nsITimer *timer)
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
mTimerArmed = false;
if (mStopped)
return NS_OK;
UpdateCredits();
DispatchEvents();
UpdateTimer();
return NS_OK;
}
void
EventTokenBucket::UpdateCredits()
{
MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread);
TimeStamp now = TimeStamp::Now();
TimeDuration elapsed = now - mLastUpdate;
mLastUpdate = now;
mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds());
if (mCredit > mMaxCredit)
mCredit = mMaxCredit;
SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %lu (%lu each.. %3.2f)\n",
this, mCredit, mUnitCost, (double)mCredit / mUnitCost));
}
} // mozilla::net
} // mozilla

Просмотреть файл

@ -0,0 +1,127 @@
/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim:set ts=2 sw=2 sts=2 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef NetEventTokenBucket_h__
#define NetEventTokenBucket_h__
#include "nsCOMPtr.h"
#include "nsDeque.h"
#include "nsICancelable.h"
#include "nsITimer.h"
#include "mozilla/TimeStamp.h"
namespace mozilla {
namespace net {
/* A token bucket is used to govern the maximum rate a series of events
can be executed at. For instance if your event was "eat a piece of cake"
then a token bucket configured to allow "1 piece per day" would spread
the eating of a 8 piece cake over 8 days even if you tried to eat the
whole thing up front. In a practical sense it 'costs' 1 token to execute
an event and tokens are 'earned' at a particular rate as time goes by.
The token bucket can be perfectly smooth or allow a configurable amount of
burstiness. A bursty token bucket allows you to save up unused credits, while
a perfectly smooth one would not. A smooth "1 per day" cake token bucket
would require 9 days to eat that cake if you skipped a slice on day 4
(use the token or lose it), while a token bucket configured with a burst
of 2 would just let you eat 2 slices on day 5 (the credits for day 4 and day
5) and finish the cake in the usual 8 days.
EventTokenBucket(hz=20, burst=5) creates a token bucket with the following properties:
+ events from an infinite stream will be admitted 20 times per second (i.e.
hz=20 means 1 event per 50 ms). Timers will be used to space things evenly down to
5ms gaps (i.e. up to 200hz). Token buckets with rates greater than 200hz will admit
multiple events with 5ms gaps between them. 10000hz is the maximum rate and 1hz is
the minimum rate.
+ The burst size controls the limit of 'credits' that a token bucket can accumulate
when idle. For our (20,5) example each event requires 50ms of credit (again, 20hz = 50ms
per event). a burst size of 5 means that the token bucket can accumulate a
maximum of 250ms (5 * 50ms) for this bucket. If no events have been admitted for the
last full second the bucket can still only accumulate 250ms of credit - but that credit
means that 5 events can be admitted without delay. A burst size of 1 is the minimum.
The EventTokenBucket is created with maximum credits already applied, but they
can be cleared with the ClearCredits() method. The maximum burst size is
15 minutes worth of events.
+ An event is submitted to the token bucket asynchronously through SubmitEvent().
The OnTokenBucketAdmitted() method of the submitted event is used as a callback
when the event is ready to run. A cancelable event is returned to the SubmitEvent() caller
for use in the case they do not wish to wait for the callback.
*/
class EventTokenBucket;
class ATokenBucketEvent
{
public:
virtual void OnTokenBucketAdmitted() = 0;
};
class TokenBucketCancelable;
class EventTokenBucket : public nsITimerCallback
{
public:
NS_DECL_ISUPPORTS
NS_DECL_NSITIMERCALLBACK
// This should be constructed on the main thread
EventTokenBucket(uint32_t eventsPerSecond, uint32_t burstSize);
virtual ~EventTokenBucket();
// These public methods are all meant to be called from the socket thread
void ClearCredits();
uint32_t BurstEventsAvailable();
uint32_t QueuedEvents();
// a paused token bucket will not process any events, but it will accumulate
// credits. ClearCredits can be used before unpausing if desired.
void Pause();
void UnPause();
void Stop() { mStopped = true; }
// The returned cancelable event can only be canceled from the socket thread
nsresult SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable);
private:
friend class RunNotifyEvent;
friend class SetTimerEvent;
bool TryImmediateDispatch(TokenBucketCancelable *event);
void SetRate(uint32_t eventsPerSecond, uint32_t burstSize);
void DispatchEvents();
void UpdateTimer();
void UpdateCredits();
const static uint64_t kUsecPerSec = 1000000;
const static uint64_t kUsecPerMsec = 1000;
const static uint64_t kMaxHz = 10000;
uint64_t mUnitCost; // usec of credit needed for 1 event (from eventsPerSecond)
uint64_t mMaxCredit; // usec mCredit limit (from busrtSize)
uint64_t mCredit; // usec of accumulated credit.
bool mPaused;
bool mStopped;
nsDeque mEvents;
bool mTimerArmed;
TimeStamp mLastUpdate;
// The timer is created on the main thread, but is armed and executes Notify()
// callbacks on the socket thread in order to maintain low latency of event
// delivery.
nsCOMPtr<nsITimer> mTimer;
};
} // ::mozilla::net
} // ::mozilla
#endif

Просмотреть файл

@ -79,6 +79,7 @@ CPPSRCS = \
ProxyAutoConfig.cpp \
Dashboard.cpp \
NetworkActivityMonitor.cpp \
EventTokenBucket.cpp \
$(NULL)
LOCAL_INCLUDES += -I$(topsrcdir)/dom/base

Просмотреть файл

@ -1247,7 +1247,7 @@ nsHttpConnection::OnSocketWritable()
else {
if (!mReportedSpdy) {
mReportedSpdy = true;
gHttpHandler->ConnMgr()->ReportSpdyConnection(this, mUsingSpdyVersion);
gHttpHandler->ConnMgr()->ReportSpdyConnection(this, mEverUsedSpdy);
}
LOG((" writing transaction request stream\n"));

Просмотреть файл

@ -63,6 +63,7 @@ nsHttpConnectionMgr::nsHttpConnectionMgr()
, mIsShuttingDown(false)
, mNumActiveConns(0)
, mNumIdleConns(0)
, mNumSpdyActiveConns(0)
, mNumHalfOpenConns(0)
, mTimeOfNextWakeUp(UINT64_MAX)
, mTimeoutTickArmed(false)
@ -413,6 +414,28 @@ nsHttpConnectionMgr::ProcessPendingQ()
return PostEvent(&nsHttpConnectionMgr::OnMsgProcessPendingQ, 0, nullptr);
}
void
nsHttpConnectionMgr::OnMsgUpdateRequestTokenBucket(int32_t, void *param)
{
nsRefPtr<EventTokenBucket> tokenBucket =
dont_AddRef(static_cast<EventTokenBucket *>(param));
gHttpHandler->SetRequestTokenBucket(tokenBucket);
}
nsresult
nsHttpConnectionMgr::UpdateRequestTokenBucket(EventTokenBucket *aBucket)
{
nsRefPtr<EventTokenBucket> bucket(aBucket);
// Call From main thread when a new EventTokenBucket has been made in order
// to post the new value to the socket thread.
nsresult rv = PostEvent(&nsHttpConnectionMgr::OnMsgUpdateRequestTokenBucket,
0, bucket.get());
if (NS_SUCCEEDED(rv))
bucket.forget();
return rv;
}
// Given a nsHttpConnectionInfo find the connection entry object that
// contains either the nshttpconnection or nshttptransaction parameter.
// Normally this is done by the hashkey lookup of connectioninfo,
@ -502,8 +525,9 @@ nsHttpConnectionMgr::ReportSpdyConnection(nsHttpConnection *conn,
if (!usingSpdy)
return;
ent->mUsingSpdy = true;
mNumSpdyActiveConns++;
uint32_t ttl = conn->TimeToLive();
uint64_t timeOfExpire = NowInSeconds() + ttl;
@ -957,7 +981,7 @@ nsHttpConnectionMgr::ShutdownPassCB(const nsACString &key,
conn = ent->mActiveConns[0];
ent->mActiveConns.RemoveElementAt(0);
self->mNumActiveConns--;
self->DecrementActiveConnCount(conn);
conn->Close(NS_ERROR_ABORT);
NS_RELEASE(conn);
@ -1588,6 +1612,23 @@ nsHttpConnectionMgr::TryDispatchTransaction(nsConnectionEntry *ent,
}
}
// Subject most transactions at high parallelism to rate pacing.
// It will only be actually submitted to the
// token bucket once, and if possible it is granted admission synchronously.
// It is important to leave a transaction in the pending queue when blocked by
// pacing so it can be found on cancel if necessary.
// Transactions that cause blocking or bypass it (e.g. js/css) are not rate
// limited.
if (gHttpHandler->UseRequestTokenBucket() &&
(mNumActiveConns >= mNumSpdyActiveConns) && // just check for robustness sake
((mNumActiveConns - mNumSpdyActiveConns) >= gHttpHandler->RequestTokenBucketMinParallelism()) &&
!(caps & (NS_HTTP_LOAD_AS_BLOCKING | NS_HTTP_LOAD_UNBLOCKED))) {
if (!trans->TryToRunPacedRequest()) {
LOG((" blocked due to rate pacing\n"));
return NS_ERROR_NOT_AVAILABLE;
}
}
// step 2
// consider an idle persistent connection
if (caps & NS_HTTP_ALLOW_KEEPALIVE) {
@ -1678,6 +1719,11 @@ nsHttpConnectionMgr::DispatchTransaction(nsConnectionEntry *ent,
"[ci=%s trans=%x caps=%x conn=%x priority=%d]\n",
ent->mConnInfo->HashKey().get(), trans, caps, conn, priority));
// It is possible for a rate-paced transaction to be dispatched independent
// of the token bucket when the amount of parallelization has changed or
// when a muxed connection (e.g. spdy or pipelines) becomes available.
trans->CancelPacing(NS_OK);
if (conn->UsingSpdy()) {
LOG(("Spdy Dispatch Transaction via Activate(). Transaction host = %s,"
"Connection host = %s\n",
@ -1765,7 +1811,7 @@ nsHttpConnectionMgr::DispatchAbstractTransaction(nsConnectionEntry *ent,
ent->mActiveConns.RemoveElement(conn);
if (conn == ent->mYellowConnection)
ent->OnYellowComplete();
mNumActiveConns--;
DecrementActiveConnCount(conn);
ConditionallyStopTimeoutTick();
// sever back references to connection, and do so without triggering
@ -1901,6 +1947,14 @@ nsHttpConnectionMgr::AddActiveConn(nsHttpConnection *conn,
ActivateTimeoutTick();
}
void
nsHttpConnectionMgr::DecrementActiveConnCount(nsHttpConnection *conn)
{
mNumActiveConns--;
if (conn->EverUsedSpdy())
mNumSpdyActiveConns--;
}
void
nsHttpConnectionMgr::StartedConnect()
{
@ -2213,13 +2267,13 @@ nsHttpConnectionMgr::OnMsgReclaimConnection(int32_t, void *param)
// reused.
conn->DontReuse();
}
if (ent->mActiveConns.RemoveElement(conn)) {
if (conn == ent->mYellowConnection)
ent->OnYellowComplete();
nsHttpConnection *temp = conn;
NS_RELEASE(temp);
mNumActiveConns--;
DecrementActiveConnCount(conn);
ConditionallyStopTimeoutTick();
}

Просмотреть файл

@ -29,6 +29,12 @@ class nsHttpPipeline;
class nsIHttpUpgradeListener;
namespace mozilla {
namespace net {
class EventTokenBucket;
}
}
//-----------------------------------------------------------------------------
class nsHttpConnectionMgr : public nsIObserver
@ -124,6 +130,10 @@ public:
// been initialized.
nsresult UpdateParam(nsParamName name, uint16_t value);
// called from main thread to post a new request token bucket
// to the socket thread
nsresult UpdateRequestTokenBucket(mozilla::net::EventTokenBucket *aBucket);
// Lookup/Cancel HTTP->SPDY redirections
bool GetSpdyAlternateProtocol(nsACString &key);
void ReportSpdyAlternateProtocol(nsHttpConnection *);
@ -517,6 +527,7 @@ private:
nsresult CreateTransport(nsConnectionEntry *, nsAHttpTransaction *,
uint32_t, bool);
void AddActiveConn(nsHttpConnection *, nsConnectionEntry *);
void DecrementActiveConnCount(nsHttpConnection *);
void StartedConnect();
void RecvdConnect();
@ -605,6 +616,7 @@ private:
void OnMsgClosePersistentConnections (int32_t, void *);
void OnMsgProcessFeedback (int32_t, void *);
void OnMsgProcessAllSpdyPendingQ (int32_t, void *);
void OnMsgUpdateRequestTokenBucket (int32_t, void *);
// Total number of active connections in all of the ConnectionEntry objects
// that are accessed from mCT connection table.
@ -612,6 +624,8 @@ private:
// Total number of idle connections in all of the ConnectionEntry objects
// that are accessed from mCT connection table.
uint16_t mNumIdleConns;
// Total number of spdy connections which are a subset of the active conns
uint16_t mNumSpdyActiveConns;
// Total number of connections in mHalfOpens ConnectionEntry objects
// that are accessed from mCT connection table
uint32_t mNumHalfOpenConns;

Просмотреть файл

@ -37,6 +37,8 @@
#include "nsAlgorithm.h"
#include "ASpdySession.h"
#include "mozIApplicationClearPrivateDataParams.h"
#include "nsICancelable.h"
#include "EventTokenBucket.h"
#include "nsIXULAppInfo.h"
@ -186,6 +188,10 @@ nsHttpHandler::nsHttpHandler()
, mSpdyPingTimeout(PR_SecondsToInterval(8))
, mConnectTimeout(90000)
, mParallelSpeculativeConnectLimit(6)
, mRequestTokenBucketEnabled(false)
, mRequestTokenBucketMinParallelism(6)
, mRequestTokenBucketHz(100)
, mRequestTokenBucketBurst(32)
, mCritialRequestPrioritization(true)
{
#if defined(PR_LOGGING)
@ -331,9 +337,22 @@ nsHttpHandler::Init()
mObserverService->AddObserver(this, "webapps-clear-data", true);
}
MakeNewRequestTokenBucket();
return NS_OK;
}
void
nsHttpHandler::MakeNewRequestTokenBucket()
{
if (!mConnMgr)
return;
nsRefPtr<mozilla::net::EventTokenBucket> tokenBucket =
new mozilla::net::EventTokenBucket(mRequestTokenBucketHz,
mRequestTokenBucketBurst);
mConnMgr->UpdateRequestTokenBucket(tokenBucket);
}
nsresult
nsHttpHandler::InitConnectionMgr()
{
@ -1197,6 +1216,40 @@ nsHttpHandler::PrefsChanged(nsIPrefBranch *prefs, const char *pref)
}
}
if (PREF_CHANGED(HTTP_PREF("pacing.requests.enabled"))) {
rv = prefs->GetBoolPref(HTTP_PREF("pacing.requests.enabled"),
&cVar);
if (NS_SUCCEEDED(rv))
mRequestTokenBucketEnabled = cVar;
}
if (PREF_CHANGED(HTTP_PREF("pacing.requests.min-parallelism"))) {
rv = prefs->GetIntPref(HTTP_PREF("pacing.requests.min-parallelism"), &val);
if (NS_SUCCEEDED(rv))
mRequestTokenBucketMinParallelism = static_cast<uint16_t>(clamped(val, 1, 1024));
}
bool requestTokenBucketUpdated = false;
if (PREF_CHANGED(HTTP_PREF("pacing.requests.hz"))) {
rv = prefs->GetIntPref(HTTP_PREF("pacing.requests.hz"), &val);
if (NS_SUCCEEDED(rv)) {
mRequestTokenBucketHz = static_cast<uint32_t>(clamped(val, 1, 10000));
requestTokenBucketUpdated = true;
}
}
if (PREF_CHANGED(HTTP_PREF("pacing.requests.burst"))) {
rv = prefs->GetIntPref(HTTP_PREF("pacing.requests.burst"), &val);
if (NS_SUCCEEDED(rv)) {
mRequestTokenBucketBurst = val ? val : 1;
requestTokenBucketUpdated = true;
}
}
if (requestTokenBucketUpdated) {
mRequestTokenBucket =
new mozilla::net::EventTokenBucket(mRequestTokenBucketHz,
mRequestTokenBucketBurst);
}
//
// Tracking options
//
@ -1270,6 +1323,9 @@ nsHttpHandler::PrefsChanged(nsIPrefBranch *prefs, const char *pref)
}
}
}
if (requestTokenBucketUpdated) {
MakeNewRequestTokenBucket();
}
#undef PREF_CHANGED
#undef MULTI_PREF_CHANGED

Просмотреть файл

@ -35,6 +35,14 @@ class nsHttpTransaction;
class nsAHttpTransaction;
class nsIHttpChannel;
class nsIPrefBranch;
class nsICancelable;
namespace mozilla {
namespace net {
class ATokenBucketEvent;
class EventTokenBucket;
}
}
//-----------------------------------------------------------------------------
// nsHttpHandler - protocol handler for HTTP and HTTPS
@ -99,6 +107,9 @@ public:
uint32_t ParallelSpeculativeConnectLimit() { return mParallelSpeculativeConnectLimit; }
bool CritialRequestPrioritization() { return mCritialRequestPrioritization; }
bool UseRequestTokenBucket() { return mRequestTokenBucketEnabled; }
uint16_t RequestTokenBucketMinParallelism() { return mRequestTokenBucketMinParallelism; }
bool PromptTempRedirect() { return mPromptTempRedirect; }
nsHttpAuthCache *AuthCache(bool aPrivate) {
@ -408,12 +419,39 @@ private:
// when starting a new speculative connection.
uint32_t mParallelSpeculativeConnectLimit;
// For Rate Pacing of HTTP/1 requests through a netwerk/base/src/EventTokenBucket
// Active requests <= *MinParallelism are not subject to the rate pacing
bool mRequestTokenBucketEnabled;
uint16_t mRequestTokenBucketMinParallelism;
uint32_t mRequestTokenBucketHz; // EventTokenBucket HZ
uint32_t mRequestTokenBucketBurst; // EventTokenBucket Burst
// Whether or not to block requests for non head js/css items (e.g. media)
// while those elements load.
bool mCritialRequestPrioritization;
};
//-----------------------------------------------------------------------------
private:
// For Rate Pacing Certain Network Events. Only assign this pointer on
// socket thread.
void MakeNewRequestTokenBucket();
nsRefPtr<mozilla::net::EventTokenBucket> mRequestTokenBucket;
public:
// Socket thread only
nsresult SubmitPacedRequest(mozilla::net::ATokenBucketEvent *event,
nsICancelable **cancel)
{
if (!mRequestTokenBucket)
return NS_ERROR_UNEXPECTED;
return mRequestTokenBucket->SubmitEvent(event, cancel);
}
// Socket thread only
void SetRequestTokenBucket(mozilla::net::EventTokenBucket *aTokenBucket)
{
mRequestTokenBucket = aTokenBucket;
}
};
extern nsHttpHandler *gHttpHandler;

Просмотреть файл

@ -115,6 +115,9 @@ nsHttpTransaction::nsHttpTransaction()
, mReportedResponseHeader(false)
, mForTakeResponseHead(nullptr)
, mResponseHeadTaken(false)
, mSubmittedRatePacing(false)
, mPassedRatePacing(false)
, mSynchronousRatePaceRequest(false)
{
LOG(("Creating nsHttpTransaction @%x\n", this));
gHttpHandler->GetMaxPipelineObjectSize(&mMaxPipelineObjectSize);
@ -124,6 +127,11 @@ nsHttpTransaction::~nsHttpTransaction()
{
LOG(("Destroying nsHttpTransaction @%x\n", this));
if (mTokenBucketCancel) {
mTokenBucketCancel->Cancel(NS_ERROR_ABORT);
mTokenBucketCancel = nullptr;
}
// Force the callbacks to be released right now
mCallbacks = nullptr;
@ -688,6 +696,11 @@ nsHttpTransaction::Close(nsresult reason)
return;
}
if (mTokenBucketCancel) {
mTokenBucketCancel->Cancel(reason);
mTokenBucketCancel = nullptr;
}
if (mActivityDistributor) {
// report the reponse is complete if not already reported
if (!mResponseIsComplete)
@ -1616,6 +1629,38 @@ nsHttpTransaction::DeleteSelfOnConsumerThread()
}
}
bool
nsHttpTransaction::TryToRunPacedRequest()
{
if (mSubmittedRatePacing)
return mPassedRatePacing;
mSubmittedRatePacing = true;
mSynchronousRatePaceRequest = true;
gHttpHandler->SubmitPacedRequest(this, getter_AddRefs(mTokenBucketCancel));
mSynchronousRatePaceRequest = false;
return mPassedRatePacing;
}
void
nsHttpTransaction::OnTokenBucketAdmitted()
{
mPassedRatePacing = true;
mTokenBucketCancel = nullptr;
if (!mSynchronousRatePaceRequest)
gHttpHandler->ConnMgr()->ProcessPendingQ(mConnInfo);
}
void
nsHttpTransaction::CancelPacing(nsresult reason)
{
if (mTokenBucketCancel) {
mTokenBucketCancel->Cancel(reason);
mTokenBucketCancel = nullptr;
}
}
//-----------------------------------------------------------------------------
// nsHttpTransaction::nsISupports
//-----------------------------------------------------------------------------

Просмотреть файл

@ -10,6 +10,7 @@
#include "nsHttpHeaderArray.h"
#include "nsAHttpTransaction.h"
#include "nsAHttpConnection.h"
#include "EventTokenBucket.h"
#include "nsCOMPtr.h"
#include "nsIPipe.h"
@ -37,6 +38,7 @@ class UpdateSecurityCallbacks;
//-----------------------------------------------------------------------------
class nsHttpTransaction : public nsAHttpTransaction
, public mozilla::net::ATokenBucketEvent
, public nsIInputStreamCallback
, public nsIOutputStreamCallback
{
@ -309,6 +311,32 @@ private:
// true when ::Set has been called with a response header
bool mSetup;
} mRestartInProgressVerifier;
// For Rate Pacing via an EventTokenBucket
public:
// called by the connection manager to run this transaction through the
// token bucket. If the token bucket admits the transaction immediately it
// returns true. The function is called repeatedly until it returns true.
bool TryToRunPacedRequest();
// ATokenBucketEvent pure virtual implementation. Called by the token bucket
// when the transaction is ready to run. If this happens asynchrounously to
// token bucket submission the transaction just posts an event that causes
// the pending transaction queue to be rerun (and TryToRunPacedRequest() to
// be run again.
void OnTokenBucketAdmitted(); // ATokenBucketEvent
// CancelPacing() can be used to tell the token bucket to remove this
// transaction from the list of pending transactions. This is used when a
// transaction is believed to be HTTP/1 (and thus subject to rate pacing)
// but later can be dispatched via spdy (not subject to rate pacing).
void CancelPacing(nsresult reason);
private:
bool mSubmittedRatePacing;
bool mPassedRatePacing;
bool mSynchronousRatePaceRequest;
nsCOMPtr<nsICancelable> mTokenBucketCancel;
};
#endif // nsHttpTransaction_h__