Bug 1497249 - P1: Introduce nsWebSocketConnection interface for separating socket manipulation r=michal,necko-reviewers

Differential Revision: https://phabricator.services.mozilla.com/D30023
This commit is contained in:
Kershaw Chang 2020-07-16 12:59:23 +00:00
Родитель 56fab6aa82
Коммит 1e225df236
6 изменённых файлов: 480 добавлений и 222 удалений

Просмотреть файл

@ -56,6 +56,7 @@
#include "mozilla/TimeStamp.h" #include "mozilla/TimeStamp.h"
#include "nsSocketTransportService2.h" #include "nsSocketTransportService2.h"
#include "nsINSSErrorsService.h" #include "nsINSSErrorsService.h"
#include "nsWebSocketConnection.h"
#include "plbase64.h" #include "plbase64.h"
#include "prmem.h" #include "prmem.h"
@ -75,10 +76,10 @@ namespace net {
NS_IMPL_ISUPPORTS(WebSocketChannel, nsIWebSocketChannel, nsIHttpUpgradeListener, NS_IMPL_ISUPPORTS(WebSocketChannel, nsIWebSocketChannel, nsIHttpUpgradeListener,
nsIRequestObserver, nsIStreamListener, nsIProtocolHandler, nsIRequestObserver, nsIStreamListener, nsIProtocolHandler,
nsIInputStreamCallback, nsIOutputStreamCallback,
nsITimerCallback, nsIDNSListener, nsIProtocolProxyCallback, nsITimerCallback, nsIDNSListener, nsIProtocolProxyCallback,
nsIInterfaceRequestor, nsIChannelEventSink, nsIInterfaceRequestor, nsIChannelEventSink,
nsIThreadRetargetableRequest, nsIObserver, nsINamed) nsIThreadRetargetableRequest, nsIObserver, nsINamed,
nsIWebSocketConnectionListener)
// We implement RFC 6455, which uses Sec-WebSocket-Version: 13 on the wire. // We implement RFC 6455, which uses Sec-WebSocket-Version: 13 on the wire.
#define SEC_WEBSOCKET_VERSION "13" #define SEC_WEBSOCKET_VERSION "13"
@ -1124,8 +1125,7 @@ WebSocketChannel::WebSocketChannel()
mBuffered(0), mBuffered(0),
mBufferSize(kIncomingBufferInitialSize), mBufferSize(kIncomingBufferInitialSize),
mCurrentOut(nullptr), mCurrentOut(nullptr),
mCurrentOutSent(0), mHdrOutSize(0),
mHdrOutToSend(0),
mHdrOut(nullptr), mHdrOut(nullptr),
mDynamicOutputSize(0), mDynamicOutputSize(0),
mDynamicOutput(nullptr), mDynamicOutput(nullptr),
@ -1898,7 +1898,7 @@ void WebSocketChannel::EnqueueOutgoingMessage(nsDeque<OutboundMessage>& aQueue,
this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length())); this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length()));
aQueue.Push(aMsg); aQueue.Push(aMsg);
OnOutputStreamReady(mSocketOut); DoEnqueueOutgoingMessage();
} }
uint16_t WebSocketChannel::ResultToCloseCode(nsresult resultCode) { uint16_t WebSocketChannel::ResultToCloseCode(nsresult resultCode) {
@ -1948,7 +1948,6 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
"%p found queued msg %p [type=%s len=%d]\n", "%p found queued msg %p [type=%s len=%d]\n",
this, mCurrentOut, msgNames[msgType], mCurrentOut->Length())); this, mCurrentOut, msgNames[msgType], mCurrentOut->Length()));
mCurrentOutSent = 0;
mHdrOut = mOutHeader; mHdrOut = mOutHeader;
uint8_t maskBit = mIsServerSide ? 0 : kMaskBit; uint8_t maskBit = mIsServerSide ? 0 : kMaskBit;
@ -1979,25 +1978,25 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
if (mScriptCloseCode) { if (mScriptCloseCode) {
NetworkEndian::writeUint16(payload, mScriptCloseCode); NetworkEndian::writeUint16(payload, mScriptCloseCode);
mOutHeader[1] += 2; mOutHeader[1] += 2;
mHdrOutToSend = 4 + maskSize; mHdrOutSize = 4 + maskSize;
if (!mScriptCloseReason.IsEmpty()) { if (!mScriptCloseReason.IsEmpty()) {
MOZ_ASSERT(mScriptCloseReason.Length() <= 123, MOZ_ASSERT(mScriptCloseReason.Length() <= 123,
"Close Reason Too Long"); "Close Reason Too Long");
mOutHeader[1] += mScriptCloseReason.Length(); mOutHeader[1] += mScriptCloseReason.Length();
mHdrOutToSend += mScriptCloseReason.Length(); mHdrOutSize += mScriptCloseReason.Length();
memcpy(payload + 2, mScriptCloseReason.BeginReading(), memcpy(payload + 2, mScriptCloseReason.BeginReading(),
mScriptCloseReason.Length()); std::min<uint32_t>(mScriptCloseReason.Length(), 123));
} }
} else { } else {
// No close code/reason, so payload length = 0. We must still send mask // No close code/reason, so payload length = 0. We must still send mask
// even though it's not used. Keep payload offset so we write mask // even though it's not used. Keep payload offset so we write mask
// below. // below.
mHdrOutToSend = 2 + maskSize; mHdrOutSize = 2 + maskSize;
} }
} else { } else {
NetworkEndian::writeUint16(payload, ResultToCloseCode(mStopOnClose)); NetworkEndian::writeUint16(payload, ResultToCloseCode(mStopOnClose));
mOutHeader[1] += 2; mOutHeader[1] += 2;
mHdrOutToSend = 4 + maskSize; mHdrOutSize = 4 + maskSize;
} }
if (mServerClosed) { if (mServerClosed) {
@ -2065,18 +2064,18 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
if (mCurrentOut->Length() < 126) { if (mCurrentOut->Length() < 126) {
mOutHeader[1] = mCurrentOut->Length() | maskBit; mOutHeader[1] = mCurrentOut->Length() | maskBit;
mHdrOutToSend = 2 + maskSize; mHdrOutSize = 2 + maskSize;
} else if (mCurrentOut->Length() <= 0xffff) { } else if (mCurrentOut->Length() <= 0xffff) {
mOutHeader[1] = 126 | maskBit; mOutHeader[1] = 126 | maskBit;
NetworkEndian::writeUint16(mOutHeader + sizeof(uint16_t), NetworkEndian::writeUint16(mOutHeader + sizeof(uint16_t),
mCurrentOut->Length()); mCurrentOut->Length());
mHdrOutToSend = 4 + maskSize; mHdrOutSize = 4 + maskSize;
} else { } else {
mOutHeader[1] = 127 | maskBit; mOutHeader[1] = 127 | maskBit;
NetworkEndian::writeUint64(mOutHeader + 2, mCurrentOut->Length()); NetworkEndian::writeUint64(mOutHeader + 2, mCurrentOut->Length());
mHdrOutToSend = 10 + maskSize; mHdrOutSize = 10 + maskSize;
} }
payload = mOutHeader + mHdrOutToSend; payload = mOutHeader + mHdrOutSize;
} }
MOZ_ASSERT(payload, "payload offset not found"); MOZ_ASSERT(payload, "payload offset not found");
@ -2118,7 +2117,7 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
mOutHeader[0] & WebSocketChannel::kRsv3Bit, mOutHeader[0] & WebSocketChannel::kRsv3Bit,
mOutHeader[0] & WebSocketChannel::kOpcodeBitsMask, mOutHeader[0] & WebSocketChannel::kOpcodeBitsMask,
mOutHeader[1] & WebSocketChannel::kMaskBit, mask, payload, mOutHeader[1] & WebSocketChannel::kMaskBit, mask, payload,
mHdrOutToSend - (payload - mOutHeader), mCurrentOut->BeginOrigReading(), mHdrOutSize - (payload - mOutHeader), mCurrentOut->BeginOrigReading(),
mCurrentOut->OrigLength()); mCurrentOut->OrigLength());
if (frame) { if (frame) {
@ -2126,7 +2125,7 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
} }
if (mask) { if (mask) {
while (payload < (mOutHeader + mHdrOutToSend)) { while (payload < (mOutHeader + mHdrOutSize)) {
*payload ^= mask >> 24; *payload ^= mask >> 24;
mask = RotateLeft(mask, 8); mask = RotateLeft(mask, 8);
payload++; payload++;
@ -2136,19 +2135,8 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
ApplyMask(mask, mCurrentOut->BeginWriting(), mCurrentOut->Length()); ApplyMask(mask, mCurrentOut->BeginWriting(), mCurrentOut->Length());
} }
int32_t len = mCurrentOut->Length(); // Transmitting begins - mHdrOutSize bytes from mOutHeader and
// mCurrentOut->Length() bytes from mCurrentOut.
// for small frames, copy it all together for a contiguous write
if (len && len <= kCopyBreak) {
memcpy(mOutHeader + mHdrOutToSend, mCurrentOut->BeginWriting(), len);
mHdrOutToSend += len;
mCurrentOutSent = len;
}
// Transmitting begins - mHdrOutToSend bytes from mOutHeader and
// mCurrentOut->Length() bytes from mCurrentOut. The latter may be
// coaleseced into the former for small messages or as the result of the
// compression process.
cleanupAfterFailure.release(); cleanupAfterFailure.release();
} }
@ -2156,7 +2144,6 @@ void WebSocketChannel::PrimeNewOutgoingMessage() {
void WebSocketChannel::DeleteCurrentOutGoingMessage() { void WebSocketChannel::DeleteCurrentOutGoingMessage() {
delete mCurrentOut; delete mCurrentOut;
mCurrentOut = nullptr; mCurrentOut = nullptr;
mCurrentOutSent = 0;
} }
void WebSocketChannel::EnsureHdrOut(uint32_t size) { void WebSocketChannel::EnsureHdrOut(uint32_t size) {
@ -2202,23 +2189,9 @@ void WebSocketChannel::CleanupConnection() {
mLingeringCloseTimer = nullptr; mLingeringCloseTimer = nullptr;
} }
if (mSocketIn) { if (mConnection) {
if (mDataStarted) { mConnection->Close();
mSocketIn->AsyncWait(nullptr, 0, 0, nullptr); mConnection = nullptr;
}
mSocketIn = nullptr;
}
if (mSocketOut) {
mSocketOut->AsyncWait(nullptr, 0, 0, nullptr);
mSocketOut = nullptr;
}
if (mTransport) {
mTransport->SetSecurityCallbacks(nullptr);
mTransport->SetEventSink(nullptr, nullptr);
mTransport->Close(NS_BASE_STREAM_CLOSED);
mTransport = nullptr;
} }
if (mConnectionLogService && !mPrivateBrowsing) { if (mConnectionLogService && !mPrivateBrowsing) {
@ -2286,30 +2259,15 @@ void WebSocketChannel::DoStopSession(nsresult reason) {
mPingTimer = nullptr; mPingTimer = nullptr;
} }
if (mSocketIn && !mTCPClosed && mDataStarted) { if (mConnection && !mTCPClosed && mDataStarted) {
// Drain, within reason, this socket. if we leave any data // Drain, within reason, this socket.
// unconsumed (including the tcp fin) a RST will be generated mConnection->DrainSocketData();
// The right thing to do here is shutdown(SHUT_WR) and then wait
// a little while to see if any data comes in.. but there is no
// reason to delay things for that when the websocket handshake
// is supposed to guarantee a quiet connection except for that fin.
char buffer[512];
uint32_t count = 0;
uint32_t total = 0;
nsresult rv;
do {
total += count;
rv = mSocketIn->Read(buffer, 512, &count);
if (rv != NS_BASE_STREAM_WOULD_BLOCK && (NS_FAILED(rv) || count == 0))
mTCPClosed = true;
} while (NS_SUCCEEDED(rv) && count > 0 && total < 32000);
} }
int32_t sessionCount = kLingeringCloseThreshold; int32_t sessionCount = kLingeringCloseThreshold;
nsWSAdmissionManager::GetSessionCount(sessionCount); nsWSAdmissionManager::GetSessionCount(sessionCount);
if (!mTCPClosed && mTransport && sessionCount < kLingeringCloseThreshold) { if (!mTCPClosed && mConnection && sessionCount < kLingeringCloseThreshold) {
// 7.1.1 says that the client SHOULD wait for the server to close the TCP // 7.1.1 says that the client SHOULD wait for the server to close the TCP
// connection. This is so we can reuse port numbers before 2 MSL expires, // connection. This is so we can reuse port numbers before 2 MSL expires,
// which is not really as much of a concern for us as the amount of state // which is not really as much of a concern for us as the amount of state
@ -2376,7 +2334,7 @@ void WebSocketChannel::AbortSession(nsresult reason) {
return; return;
} }
if (mTransport && reason != NS_BASE_STREAM_CLOSED && !mRequestedClose && if (mConnection && reason != NS_BASE_STREAM_CLOSED && !mRequestedClose &&
!mClientClosed && !mServerClosed && mDataStarted) { !mClientClosed && !mServerClosed && mDataStarted) {
mRequestedClose = true; mRequestedClose = true;
mStopOnClose = reason; mStopOnClose = reason;
@ -2854,10 +2812,11 @@ nsresult WebSocketChannel::StartWebsocketData() {
mDataStarted = true; mDataStarted = true;
} }
rv = mSocketIn->AsyncWait(this, 0, 0, mSocketThread); rv = mConnection->StartReading();
if (NS_FAILED(rv)) { if (NS_FAILED(rv)) {
LOG( LOG(
("WebSocketChannel::StartWebsocketData mSocketIn->AsyncWait() failed " ("WebSocketChannel::StartWebsocketData mConnection->StartReading() "
"failed "
"with error 0x%08" PRIx32, "with error 0x%08" PRIx32,
static_cast<uint32_t>(rv))); static_cast<uint32_t>(rv)));
return mSocketThread->Dispatch( return mSocketThread->Dispatch(
@ -2866,18 +2825,24 @@ nsresult WebSocketChannel::StartWebsocketData() {
NS_DISPATCH_NORMAL); NS_DISPATCH_NORMAL);
} }
if (mPingInterval) { RefPtr<WebSocketChannel> self = this;
rv = mSocketThread->Dispatch( rv = mSocketThread->Dispatch(
NewRunnableMethod("net::WebSocketChannel::StartPinging", this, NS_NewRunnableFunction("net::WebSocketChannel::StartPinging",
&WebSocketChannel::StartPinging), [self]() {
NS_DISPATCH_NORMAL); if (self->mPingInterval) {
if (NS_FAILED(rv)) { Unused << self->StartPinging();
LOG( }
("WebSocketChannel::StartWebsocketData Could not start pinging, " // Try to send the messages in the queue out
"rv=0x%08" PRIx32, // immediately.
static_cast<uint32_t>(rv))); self->DoEnqueueOutgoingMessage();
return rv; }),
} NS_DISPATCH_NORMAL);
if (NS_FAILED(rv)) {
LOG(
("WebSocketChannel::StartWebsocketData Could not start pinging, "
"rv=0x%08" PRIx32,
static_cast<uint32_t>(rv)));
return rv;
} }
LOG(("WebSocketChannel::StartWebsocketData Notifying Listener %p", LOG(("WebSocketChannel::StartWebsocketData Notifying Listener %p",
@ -3211,8 +3176,8 @@ WebSocketChannel::GetSecurityInfo(nsISupports** aSecurityInfo) {
LOG(("WebSocketChannel::GetSecurityInfo() %p\n", this)); LOG(("WebSocketChannel::GetSecurityInfo() %p\n", this));
MOZ_ASSERT(NS_IsMainThread(), "not main thread"); MOZ_ASSERT(NS_IsMainThread(), "not main thread");
if (mTransport) { if (mConnection) {
if (NS_FAILED(mTransport->GetSecurityInfo(aSecurityInfo))) if (NS_FAILED(mConnection->GetSecurityInfo(aSecurityInfo)))
*aSecurityInfo = nullptr; *aSecurityInfo = nullptr;
} }
return NS_OK; return NS_OK;
@ -3562,14 +3527,8 @@ WebSocketChannel::OnTransportAvailable(nsISocketTransport* aTransport,
MOZ_ASSERT(!mRecvdHttpUpgradeTransport, "OTA duplicated"); MOZ_ASSERT(!mRecvdHttpUpgradeTransport, "OTA duplicated");
MOZ_ASSERT(aSocketIn, "OTA with invalid socketIn"); MOZ_ASSERT(aSocketIn, "OTA with invalid socketIn");
mTransport = aTransport; mConnection = new nsWebSocketConnection(aTransport, aSocketIn, aSocketOut);
mSocketIn = aSocketIn; nsresult rv = mConnection->Init(this, mSocketThread);
mSocketOut = aSocketOut;
nsresult rv;
rv = mTransport->SetEventSink(nullptr, nullptr);
if (NS_FAILED(rv)) return rv;
rv = mTransport->SetSecurityCallbacks(this);
if (NS_FAILED(rv)) return rv; if (NS_FAILED(rv)) return rv;
mRecvdHttpUpgradeTransport = 1; mRecvdHttpUpgradeTransport = 1;
@ -3861,133 +3820,40 @@ WebSocketChannel::OnStopRequest(nsIRequest* aRequest, nsresult aStatusCode) {
return NS_OK; return NS_OK;
} }
// nsIInputStreamCallback void WebSocketChannel::DoEnqueueOutgoingMessage() {
LOG(("WebSocketChannel::DoEnqueueOutgoingMessage() %p\n", this));
NS_IMETHODIMP
WebSocketChannel::OnInputStreamReady(nsIAsyncInputStream* aStream) {
LOG(("WebSocketChannel::OnInputStreamReady() %p\n", this));
MOZ_ASSERT(OnSocketThread(), "not on socket thread"); MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!mSocketIn) // did we we clean up the socket after scheduling InputReady?
return NS_OK;
// this is after the http upgrade - so we are speaking websockets
char buffer[2048];
uint32_t count;
nsresult rv;
do {
rv = mSocketIn->Read((char*)buffer, 2048, &count);
LOG(("WebSocketChannel::OnInputStreamReady: read %u rv %" PRIx32 "\n",
count, static_cast<uint32_t>(rv)));
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSocketIn->AsyncWait(this, 0, 0, mSocketThread);
return NS_OK;
}
if (NS_FAILED(rv)) {
AbortSession(rv);
return rv;
}
if (count == 0) {
AbortSession(NS_BASE_STREAM_CLOSED);
return NS_OK;
}
if (mStopped) {
continue;
}
rv = ProcessInput((uint8_t*)buffer, count);
if (NS_FAILED(rv)) {
AbortSession(rv);
return rv;
}
} while (NS_SUCCEEDED(rv) && mSocketIn);
return NS_OK;
}
// nsIOutputStreamCallback
NS_IMETHODIMP
WebSocketChannel::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
LOG(("WebSocketChannel::OnOutputStreamReady() %p\n", this));
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
nsresult rv;
if (!mCurrentOut) PrimeNewOutgoingMessage(); if (!mCurrentOut) PrimeNewOutgoingMessage();
while (mCurrentOut && mSocketOut) { while (mCurrentOut && mConnection) {
const char* sndBuf; LOG(
uint32_t toSend; ("WebSocketChannel::DoEnqueueOutgoingMessage: "
uint32_t amtSent; "Try to send %u of hdr/copybreak and %u of data\n",
mHdrOutSize, mCurrentOut->Length()));
if (mHdrOut) { nsresult rv = mConnection->EnqueueOutputData(
sndBuf = (const char*)mHdrOut; mHdrOut, mHdrOutSize, (uint8_t*)mCurrentOut->BeginReading(),
toSend = mHdrOutToSend; mCurrentOut->Length());
LOG(
("WebSocketChannel::OnOutputStreamReady: " LOG(("WebSocketChannel::DoEnqueueOutgoingMessage: rv %" PRIx32 "\n",
"Try to send %u of hdr/copybreak\n", static_cast<uint32_t>(rv)));
toSend));
} else { if (NS_FAILED(rv)) {
sndBuf = (char*)mCurrentOut->BeginReading() + mCurrentOutSent; AbortSession(rv);
toSend = mCurrentOut->Length() - mCurrentOutSent; return;
if (toSend > 0) {
LOG(
("WebSocketChannel::OnOutputStreamReady: "
"Try to send %u of data\n",
toSend));
}
} }
if (toSend == 0) { if (!mStopped) {
amtSent = 0; mTargetThread->Dispatch(
} else { new CallAcknowledge(this, mCurrentOut->OrigLength()),
rv = mSocketOut->Write(sndBuf, toSend, &amtSent); NS_DISPATCH_NORMAL);
LOG(("WebSocketChannel::OnOutputStreamReady: write %u rv %" PRIx32 "\n",
amtSent, static_cast<uint32_t>(rv)));
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSocketOut->AsyncWait(this, 0, 0, mSocketThread);
return NS_OK;
}
if (NS_FAILED(rv)) {
AbortSession(rv);
return NS_OK;
}
}
if (mHdrOut) {
if (amtSent == toSend) {
mHdrOut = nullptr;
mHdrOutToSend = 0;
} else {
mHdrOut += amtSent;
mHdrOutToSend -= amtSent;
mSocketOut->AsyncWait(this, 0, 0, mSocketThread);
}
} else {
if (amtSent == toSend) {
if (!mStopped) {
mTargetThread->Dispatch(
new CallAcknowledge(this, mCurrentOut->OrigLength()),
NS_DISPATCH_NORMAL);
}
DeleteCurrentOutGoingMessage();
PrimeNewOutgoingMessage();
} else {
mCurrentOutSent += amtSent;
mSocketOut->AsyncWait(this, 0, 0, mSocketThread);
}
} }
DeleteCurrentOutGoingMessage();
PrimeNewOutgoingMessage();
} }
if (mReleaseOnTransmit) ReleaseSession(); if (mReleaseOnTransmit) ReleaseSession();
return NS_OK;
} }
// nsIStreamListener // nsIStreamListener
@ -4010,6 +3876,28 @@ WebSocketChannel::OnDataAvailable(nsIRequest* aRequest,
return NS_OK; return NS_OK;
} }
NS_IMETHODIMP
WebSocketChannel::OnError(nsresult aStatus) {
AbortSession(aStatus);
return NS_OK;
}
NS_IMETHODIMP
WebSocketChannel::OnTCPClosed() {
if (mLingeringCloseTimer) {
MOZ_ASSERT(mStopped, "Lingering without Stop");
LOG(("WebSocketChannel:: Cleanup connection based on TCP Close"));
CleanupConnection();
}
return NS_OK;
}
NS_IMETHODIMP
WebSocketChannel::OnDataReceived(uint8_t* aData, uint32_t aCount) {
return ProcessInput(aData, aCount);
}
} // namespace net } // namespace net
} // namespace mozilla } // namespace mozilla

Просмотреть файл

@ -19,6 +19,7 @@
#include "nsIProtocolProxyCallback.h" #include "nsIProtocolProxyCallback.h"
#include "nsIChannelEventSink.h" #include "nsIChannelEventSink.h"
#include "nsIHttpChannelInternal.h" #include "nsIHttpChannelInternal.h"
#include "nsIWebSocketConnection.h"
#include "BaseWebSocketChannel.h" #include "BaseWebSocketChannel.h"
#include "nsCOMPtr.h" #include "nsCOMPtr.h"
@ -63,15 +64,14 @@ enum wsConnectingState {
class WebSocketChannel : public BaseWebSocketChannel, class WebSocketChannel : public BaseWebSocketChannel,
public nsIHttpUpgradeListener, public nsIHttpUpgradeListener,
public nsIStreamListener, public nsIStreamListener,
public nsIInputStreamCallback,
public nsIOutputStreamCallback,
public nsITimerCallback, public nsITimerCallback,
public nsIDNSListener, public nsIDNSListener,
public nsIObserver, public nsIObserver,
public nsIProtocolProxyCallback, public nsIProtocolProxyCallback,
public nsIInterfaceRequestor, public nsIInterfaceRequestor,
public nsIChannelEventSink, public nsIChannelEventSink,
public nsINamed { public nsINamed,
public nsIWebSocketConnectionListener {
friend class WebSocketFrame; friend class WebSocketFrame;
public: public:
@ -79,8 +79,6 @@ class WebSocketChannel : public BaseWebSocketChannel,
NS_DECL_NSIHTTPUPGRADELISTENER NS_DECL_NSIHTTPUPGRADELISTENER
NS_DECL_NSIREQUESTOBSERVER NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMLISTENER NS_DECL_NSISTREAMLISTENER
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIOUTPUTSTREAMCALLBACK
NS_DECL_NSITIMERCALLBACK NS_DECL_NSITIMERCALLBACK
NS_DECL_NSIDNSLISTENER NS_DECL_NSIDNSLISTENER
NS_DECL_NSIPROTOCOLPROXYCALLBACK NS_DECL_NSIPROTOCOLPROXYCALLBACK
@ -88,6 +86,7 @@ class WebSocketChannel : public BaseWebSocketChannel,
NS_DECL_NSICHANNELEVENTSINK NS_DECL_NSICHANNELEVENTSINK
NS_DECL_NSIOBSERVER NS_DECL_NSIOBSERVER
NS_DECL_NSINAMED NS_DECL_NSINAMED
NS_DECL_NSIWEBSOCKETCONNECTIONLISTENER
// nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us // nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us
// //
@ -142,6 +141,7 @@ class WebSocketChannel : public BaseWebSocketChannel,
void EnqueueOutgoingMessage(nsDeque<OutboundMessage>& aQueue, void EnqueueOutgoingMessage(nsDeque<OutboundMessage>& aQueue,
OutboundMessage* aMsg); OutboundMessage* aMsg);
void DoEnqueueOutgoingMessage();
void PrimeNewOutgoingMessage(); void PrimeNewOutgoingMessage();
void DeleteCurrentOutGoingMessage(); void DeleteCurrentOutGoingMessage();
@ -212,9 +212,7 @@ class WebSocketChannel : public BaseWebSocketChannel,
nsCString mHost; nsCString mHost;
nsString mEffectiveURL; nsString mEffectiveURL;
nsCOMPtr<nsISocketTransport> mTransport; nsCOMPtr<nsIWebSocketConnection> mConnection;
nsCOMPtr<nsIAsyncInputStream> mSocketIn;
nsCOMPtr<nsIAsyncOutputStream> mSocketOut;
nsCOMPtr<nsITimer> mCloseTimer; nsCOMPtr<nsITimer> mCloseTimer;
uint32_t mCloseTimeout; /* milliseconds */ uint32_t mCloseTimeout; /* milliseconds */
@ -280,17 +278,16 @@ class WebSocketChannel : public BaseWebSocketChannel,
uint32_t mBuffered; uint32_t mBuffered;
uint32_t mBufferSize; uint32_t mBufferSize;
// These are for the send buffers
const static int32_t kCopyBreak = 1000;
OutboundMessage* mCurrentOut; OutboundMessage* mCurrentOut;
uint32_t mCurrentOutSent;
nsDeque<OutboundMessage> mOutgoingMessages; nsDeque<OutboundMessage> mOutgoingMessages;
nsDeque<OutboundMessage> mOutgoingPingMessages; nsDeque<OutboundMessage> mOutgoingPingMessages;
nsDeque<OutboundMessage> mOutgoingPongMessages; nsDeque<OutboundMessage> mOutgoingPongMessages;
uint32_t mHdrOutToSend; uint32_t mHdrOutSize;
uint8_t* mHdrOut; uint8_t* mHdrOut;
uint8_t mOutHeader[kCopyBreak + 16]; // This is used to store the frame header and the close reason.
// Since the length of close reason can not be larger than 123, 256 is
// enough here.
uint8_t mOutHeader[256];
UniquePtr<PMCECompression> mPMCECompressor; UniquePtr<PMCECompression> mPMCECompressor;
uint32_t mDynamicOutputSize; uint32_t mDynamicOutputSize;
uint8_t* mDynamicOutput; uint8_t* mDynamicOutput;

Просмотреть файл

@ -10,6 +10,7 @@ with Files('**'):
XPIDL_SOURCES += [ XPIDL_SOURCES += [
'nsITransportProvider.idl', 'nsITransportProvider.idl',
'nsIWebSocketChannel.idl', 'nsIWebSocketChannel.idl',
'nsIWebSocketConnection.idl',
'nsIWebSocketEventService.idl', 'nsIWebSocketEventService.idl',
'nsIWebSocketListener.idl', 'nsIWebSocketListener.idl',
] ]
@ -31,6 +32,7 @@ EXPORTS.mozilla.net += [
UNIFIED_SOURCES += [ UNIFIED_SOURCES += [
'BaseWebSocketChannel.cpp', 'BaseWebSocketChannel.cpp',
'IPCTransportProvider.cpp', 'IPCTransportProvider.cpp',
'nsWebSocketConnection.cpp',
'WebSocketChannel.cpp', 'WebSocketChannel.cpp',
'WebSocketChannelChild.cpp', 'WebSocketChannelChild.cpp',
'WebSocketChannelParent.cpp', 'WebSocketChannelParent.cpp',

Просмотреть файл

@ -0,0 +1,74 @@
/* vim:set ts=4 sw=4 et cindent: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "nsISupports.idl"
interface nsIEventTarget;
interface nsIWebSocketConnectionListener;
/**
* nsIWebSocketConnection
*
* An internal interface that only uses for WebSocketChannel.
* Provides methods for sending and receving data.
*/
[uuid(4256eb9e-61eb-4ec9-b8c6-b68aee3ba390)]
interface nsIWebSocketConnection : nsISupports
{
/**
* Initialize a WebSocketConnection.
*
* @param aListener
* The listener to be notified when data is recevied or
* an error happened.
* @param aEventTarget
* The event target where the listener's methods will be called.
*/
void init(in nsIWebSocketConnectionListener aListener, in nsIEventTarget aEventTarget);
/**
* Close the connection.
*/
void close();
/**
* Put the outgoing data into a queue.
*/
void EnqueueOutputData([const, array, size_is(aHdrBufLength)]in uint8_t aHdrBuf,
in unsigned long aHdrBufLength,
[const, array, size_is(aPayloadBufLength)]in uint8_t aPayloadBuf,
in unsigned long aPayloadBufLength);
/**
* Let the connection start reading the data.
*/
void startReading();
/**
* Keep reading the data until there is nothing to read.
*/
void drainSocketData();
/**
* Transport-level security information (if any)
*/
[must_use] readonly attribute nsISupports securityInfo;
};
/**
* nsIWebSocketConnectionListener
*
* The listener used to receive the status update or incoming data.
*/
[scriptable, uuid(1c6ab15b-8a0c-4d76-81f8-326a6e0bcb90)]
interface nsIWebSocketConnectionListener : nsISupports
{
void onError(in nsresult aStatus);
void onTCPClosed();
void onDataReceived([array, size_is(dataLength)]in uint8_t data,
in unsigned long dataLength);
};

Просмотреть файл

@ -0,0 +1,231 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set sw=2 ts=8 et tw=80 : */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "nsWebSocketConnection.h"
#include "WebSocketLog.h"
#include "nsIOService.h"
#include "nsISocketTransport.h"
#include "nsSocketTransportService2.h"
NS_IMPL_ISUPPORTS(nsWebSocketConnection, nsIWebSocketConnection,
nsIInputStreamCallback, nsIOutputStreamCallback)
nsWebSocketConnection::nsWebSocketConnection(
nsISocketTransport* aTransport, nsIAsyncInputStream* aInputStream,
nsIAsyncOutputStream* aOutputStream)
: mTransport(aTransport),
mSocketIn(aInputStream),
mSocketOut(aOutputStream),
mWriteOffset(0),
mStartReadingCalled(false) {}
NS_IMETHODIMP
nsWebSocketConnection::Init(nsIWebSocketConnectionListener* aListener,
nsIEventTarget* aEventTarget) {
NS_ENSURE_ARG_POINTER(aListener);
NS_ENSURE_ARG_POINTER(aEventTarget);
MOZ_ASSERT_IF(nsIOService::UseSocketProcess(), XRE_IsSocketProcess());
MOZ_ASSERT_IF(!nsIOService::UseSocketProcess(), XRE_IsParentProcess());
mListener = aListener;
mEventTarget = aEventTarget;
if (!mTransport) {
return NS_ERROR_FAILURE;
}
if (XRE_IsParentProcess()) {
nsCOMPtr<nsIInterfaceRequestor> callbacks = do_QueryInterface(mListener);
mTransport->SetSecurityCallbacks(callbacks);
} else {
// TODO: deal with security callbacks in bug 1512479
mTransport->SetSecurityCallbacks(nullptr);
}
return mTransport->SetEventSink(nullptr, nullptr);
}
NS_IMETHODIMP
nsWebSocketConnection::Close() {
if (mTransport) {
mTransport->SetSecurityCallbacks(nullptr);
mTransport->SetEventSink(nullptr, nullptr);
mTransport->Close(NS_BASE_STREAM_CLOSED);
mTransport = nullptr;
}
if (mSocketIn) {
if (mStartReadingCalled) {
mSocketIn->AsyncWait(nullptr, 0, 0, nullptr);
}
mSocketIn = nullptr;
}
if (mSocketOut) {
mSocketOut->AsyncWait(nullptr, 0, 0, nullptr);
mSocketOut = nullptr;
}
mListener = nullptr;
return NS_OK;
}
NS_IMETHODIMP
nsWebSocketConnection::EnqueueOutputData(const uint8_t* aHdrBuf,
uint32_t aHdrBufLength,
const uint8_t* aPayloadBuf,
uint32_t aPayloadBufLength) {
LOG(("nsWebSocketConnection::EnqueueOutputData %p\n", this));
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
nsTArray<uint8_t> data;
data.AppendElements(aHdrBuf, aHdrBufLength);
data.AppendElements(aPayloadBuf, aPayloadBufLength);
mOutputQueue.emplace_back(std::move(data));
if (mSocketOut) {
mSocketOut->AsyncWait(this, 0, 0, mEventTarget);
}
return NS_OK;
}
NS_IMETHODIMP
nsWebSocketConnection::StartReading() {
if (!mSocketIn) {
return NS_ERROR_NOT_AVAILABLE;
}
MOZ_ASSERT(!mStartReadingCalled, "StartReading twice");
mStartReadingCalled = true;
return mSocketIn->AsyncWait(this, 0, 0, mEventTarget);
}
NS_IMETHODIMP
nsWebSocketConnection::DrainSocketData() {
MOZ_ASSERT(OnSocketThread());
if (!mSocketIn || !mListener) {
return NS_ERROR_NOT_AVAILABLE;
}
// If we leave any data unconsumed (including the tcp fin) a RST will be
// generated The right thing to do here is shutdown(SHUT_WR) and then wait a
// little while to see if any data comes in.. but there is no reason to delay
// things for that when the websocket handshake is supposed to guarantee a
// quiet connection except for that fin.
char buffer[512];
uint32_t count = 0;
uint32_t total = 0;
nsresult rv;
do {
total += count;
rv = mSocketIn->Read(buffer, 512, &count);
if (rv != NS_BASE_STREAM_WOULD_BLOCK && (NS_FAILED(rv) || count == 0)) {
mListener->OnTCPClosed();
}
} while (NS_SUCCEEDED(rv) && count > 0 && total < 32000);
return NS_OK;
}
NS_IMETHODIMP
nsWebSocketConnection::GetSecurityInfo(nsISupports** aSecurityInfo) {
LOG(("nsWebSocketConnection::GetSecurityInfo() %p\n", this));
MOZ_ASSERT(NS_IsMainThread(), "not main thread");
if (mTransport) {
if (NS_FAILED(mTransport->GetSecurityInfo(aSecurityInfo)))
*aSecurityInfo = nullptr;
}
return NS_OK;
}
NS_IMETHODIMP
nsWebSocketConnection::OnInputStreamReady(nsIAsyncInputStream* aStream) {
LOG(("nsWebSocketConnection::OnInputStreamReady() %p\n", this));
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!mSocketIn) // did we we clean up the socket after scheduling InputReady?
return NS_OK;
if (!mListener) return NS_OK;
// this is after the http upgrade - so we are speaking websockets
uint8_t buffer[2048];
uint32_t count;
nsresult rv;
do {
rv = mSocketIn->Read((char*)buffer, 2048, &count);
LOG(("nsWebSocketConnection::OnInputStreamReady: read %u rv %" PRIx32 "\n",
count, static_cast<uint32_t>(rv)));
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSocketIn->AsyncWait(this, 0, 0, mEventTarget);
return NS_OK;
}
if (NS_FAILED(rv)) {
mListener->OnError(rv);
return rv;
}
if (count == 0) {
mListener->OnError(NS_BASE_STREAM_CLOSED);
return NS_OK;
}
rv = mListener->OnDataReceived(buffer, count);
if (NS_FAILED(rv)) {
mListener->OnError(rv);
return rv;
}
} while (NS_SUCCEEDED(rv) && mSocketIn && mListener);
return NS_OK;
}
NS_IMETHODIMP
nsWebSocketConnection::OnOutputStreamReady(nsIAsyncOutputStream* aStream) {
LOG(("nsWebSocketConnection::OnOutputStreamReady() %p\n", this));
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
if (!mListener) return NS_OK;
while (!mOutputQueue.empty()) {
const OutputData& data = mOutputQueue.front();
char* buffer = reinterpret_cast<char*>(
const_cast<uint8_t*>(data.GetData().Elements())) +
mWriteOffset;
uint32_t toWrite = data.GetData().Length() - mWriteOffset;
uint32_t wrote = 0;
nsresult rv = mSocketOut->Write(buffer, toWrite, &wrote);
if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
mSocketOut->AsyncWait(this, 0, 0, mEventTarget);
return NS_OK;
}
if (NS_FAILED(rv)) {
LOG(("nsWebSocketConnection::OnOutputStreamReady %p failed %u\n", this,
static_cast<uint32_t>(rv)));
mListener->OnError(rv);
return NS_OK;
}
mWriteOffset += wrote;
if (toWrite == wrote) {
mWriteOffset = 0;
mOutputQueue.pop_front();
}
}
return NS_OK;
}

Просмотреть файл

@ -0,0 +1,66 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set sw=2 ts=8 et tw=80 : */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef mozilla_net_nsWebSocketConnection_h
#define mozilla_net_nsWebSocketConnection_h
#include <list>
#include "nsISupports.h"
#include "nsIStreamListener.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsIWebSocketConnection.h"
class nsISocketTransport;
namespace mozilla {
namespace net {
class nsWebSocketConnection : public nsIWebSocketConnection,
public nsIInputStreamCallback,
public nsIOutputStreamCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIWEBSOCKETCONNECTION
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIOUTPUTSTREAMCALLBACK
explicit nsWebSocketConnection(nsISocketTransport* aTransport,
nsIAsyncInputStream* aInputStream,
nsIAsyncOutputStream* aOutputStream);
private:
virtual ~nsWebSocketConnection() = default;
class OutputData {
public:
explicit OutputData(nsTArray<uint8_t>&& aData) : mData(std::move(aData)) {
MOZ_COUNT_CTOR(OutputData);
}
~OutputData() { MOZ_COUNT_DTOR(OutputData); }
const nsTArray<uint8_t>& GetData() const { return mData; }
private:
nsTArray<uint8_t> mData;
};
nsCOMPtr<nsIWebSocketConnectionListener> mListener;
nsCOMPtr<nsISocketTransport> mTransport;
nsCOMPtr<nsIAsyncInputStream> mSocketIn;
nsCOMPtr<nsIAsyncOutputStream> mSocketOut;
nsCOMPtr<nsIEventTarget> mEventTarget;
size_t mWriteOffset;
std::list<OutputData> mOutputQueue;
bool mStartReadingCalled;
};
} // namespace net
} // namespace mozilla
#endif // mozilla_net_nsWebSocketConnection_h