Bug 676439 - Websocket Binary Message support: Necko changes. r=mcmanus

This commit is contained in:
Jason Duell 2011-12-15 15:20:17 -08:00
Родитель c9c9cf8f9a
Коммит 3757351c07
8 изменённых файлов: 307 добавлений и 146 удалений

Просмотреть файл

@ -44,6 +44,7 @@ include protocol PBrowser;
include "mozilla/net/NeckoMessageUtils.h"; include "mozilla/net/NeckoMessageUtils.h";
using IPC::URI; using IPC::URI;
using IPC::InputStream;
namespace mozilla { namespace mozilla {
namespace net { namespace net {
@ -58,6 +59,7 @@ parent:
Close(PRUint16 code, nsCString reason); Close(PRUint16 code, nsCString reason);
SendMsg(nsCString aMsg); SendMsg(nsCString aMsg);
SendBinaryMsg(nsCString aMsg); SendBinaryMsg(nsCString aMsg);
SendBinaryStream(InputStream aStream, PRUint32 aLength);
DeleteSelf(); DeleteSelf();

Просмотреть файл

@ -66,6 +66,7 @@
#include "nsStringStream.h" #include "nsStringStream.h"
#include "nsAlgorithm.h" #include "nsAlgorithm.h"
#include "nsProxyRelease.h" #include "nsProxyRelease.h"
#include "nsNetUtil.h"
#include "plbase64.h" #include "plbase64.h"
#include "prmem.h" #include "prmem.h"
@ -91,10 +92,6 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocketChannel,
nsIInterfaceRequestor, nsIInterfaceRequestor,
nsIChannelEventSink) nsIChannelEventSink)
// Use this fake ptr so the Fin message stays in sequence in the
// main transmit queue
#define kFinMessage (reinterpret_cast<nsCString *>(0x01))
// An implementation of draft-ietf-hybi-thewebsocketprotocol-08 // An implementation of draft-ietf-hybi-thewebsocketprotocol-08
#define SEC_WEBSOCKET_VERSION "8" #define SEC_WEBSOCKET_VERSION "8"
@ -113,6 +110,10 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocketChannel,
// some helper classes // some helper classes
//-----------------------------------------------------------------------------
// CallOnMessageAvailable
//-----------------------------------------------------------------------------
class CallOnMessageAvailable : public nsIRunnable class CallOnMessageAvailable : public nsIRunnable
{ {
public: public:
@ -125,7 +126,7 @@ public:
mData(aData), mData(aData),
mLen(aLen) {} mLen(aLen) {}
NS_SCRIPTABLE NS_IMETHOD Run() NS_IMETHOD Run()
{ {
if (mLen < 0) if (mLen < 0)
mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData); mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData);
@ -143,6 +144,10 @@ private:
}; };
NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnMessageAvailable, nsIRunnable) NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnMessageAvailable, nsIRunnable)
//-----------------------------------------------------------------------------
// CallOnStop
//-----------------------------------------------------------------------------
class CallOnStop : public nsIRunnable class CallOnStop : public nsIRunnable
{ {
public: public:
@ -153,7 +158,7 @@ public:
: mChannel(aChannel), : mChannel(aChannel),
mData(aData) {} mData(aData) {}
NS_SCRIPTABLE NS_IMETHOD Run() NS_IMETHOD Run()
{ {
mChannel->mListener->OnStop(mChannel->mContext, mData); mChannel->mListener->OnStop(mChannel->mContext, mData);
return NS_OK; return NS_OK;
@ -167,6 +172,10 @@ private:
}; };
NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnStop, nsIRunnable) NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnStop, nsIRunnable)
//-----------------------------------------------------------------------------
// CallOnServerClose
//-----------------------------------------------------------------------------
class CallOnServerClose : public nsIRunnable class CallOnServerClose : public nsIRunnable
{ {
public: public:
@ -179,7 +188,7 @@ public:
mCode(aCode), mCode(aCode),
mReason(aReason) {} mReason(aReason) {}
NS_SCRIPTABLE NS_IMETHOD Run() NS_IMETHOD Run()
{ {
mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason); mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason);
return NS_OK; return NS_OK;
@ -194,6 +203,10 @@ private:
}; };
NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnServerClose, nsIRunnable) NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnServerClose, nsIRunnable)
//-----------------------------------------------------------------------------
// CallAcknowledge
//-----------------------------------------------------------------------------
class CallAcknowledge : public nsIRunnable class CallAcknowledge : public nsIRunnable
{ {
public: public:
@ -204,7 +217,7 @@ public:
: mChannel(aChannel), : mChannel(aChannel),
mSize(aSize) {} mSize(aSize) {}
NS_SCRIPTABLE NS_IMETHOD Run() NS_IMETHOD Run()
{ {
LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize)); LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize));
mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize); mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize);
@ -219,34 +232,145 @@ private:
}; };
NS_IMPL_THREADSAFE_ISUPPORTS1(CallAcknowledge, nsIRunnable) NS_IMPL_THREADSAFE_ISUPPORTS1(CallAcknowledge, nsIRunnable)
class nsPostMessage : public nsIRunnable //-----------------------------------------------------------------------------
// OutboundMessage
//-----------------------------------------------------------------------------
enum WsMsgType {
kMsgTypeString = 0,
kMsgTypeBinaryString,
kMsgTypeStream,
kMsgTypePing,
kMsgTypePong,
kMsgTypeFin
};
static const char* msgNames[] = {
"text",
"binaryString",
"binaryStream",
"ping",
"pong",
"close"
};
class OutboundMessage
{ {
public: public:
NS_DECL_ISUPPORTS OutboundMessage(WsMsgType type, nsCString *str)
: mMsgType(type)
nsPostMessage(WebSocketChannel *aChannel,
nsCString *aData,
PRInt32 aDataLen)
: mChannel(aChannel),
mData(aData),
mDataLen(aDataLen) {}
NS_SCRIPTABLE NS_IMETHOD Run()
{ {
if (mData) MOZ_COUNT_CTOR(OutboundMessage);
mChannel->SendMsgInternal(mData, mDataLen); mMsg.pString = str;
mLength = str ? str->Length() : 0;
}
OutboundMessage(nsIInputStream *stream, PRUint32 length)
: mMsgType(kMsgTypeStream), mLength(length)
{
MOZ_COUNT_CTOR(OutboundMessage);
mMsg.pStream = stream;
mMsg.pStream->AddRef();
}
~OutboundMessage() {
MOZ_COUNT_DTOR(OutboundMessage);
switch (mMsgType) {
case kMsgTypeString:
case kMsgTypeBinaryString:
case kMsgTypePing:
case kMsgTypePong:
delete mMsg.pString;
break;
case kMsgTypeStream:
// for now this only gets hit if msg deleted w/o being sent
if (mMsg.pStream) {
mMsg.pStream->Close();
mMsg.pStream->Release();
}
break;
case kMsgTypeFin:
break; // do-nothing: avoid compiler warning
}
}
WsMsgType GetMsgType() const { return mMsgType; }
PRInt32 Length() const { return mLength; }
PRUint8* BeginWriting() {
NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream,
"Stream should have been converted to string by now");
return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginWriting() : nsnull);
}
PRUint8* BeginReading() {
NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream,
"Stream should have been converted to string by now");
return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginReading() : nsnull);
}
nsresult ConvertStreamToString()
{
NS_ABORT_IF_FALSE(mMsgType == kMsgTypeStream, "Not a stream!");
#ifdef DEBUG
// Make sure we got correct length from Blob
PRUint32 bytes;
mMsg.pStream->Available(&bytes);
NS_ASSERTION(bytes == mLength, "Stream length != blob length!");
#endif
nsAutoPtr<nsCString> temp(new nsCString());
nsresult rv = NS_ReadInputStreamToString(mMsg.pStream, *temp, mLength);
NS_ENSURE_SUCCESS(rv, rv);
mMsg.pStream->Close();
mMsg.pStream->Release();
mMsg.pString = temp.forget();
mMsgType = kMsgTypeBinaryString;
return NS_OK; return NS_OK;
} }
private: private:
~nsPostMessage() {} union {
nsCString *pString;
nsRefPtr<WebSocketChannel> mChannel; nsIInputStream *pStream;
nsCString *mData; } mMsg;
PRInt32 mDataLen; WsMsgType mMsgType;
PRUint32 mLength;
}; };
NS_IMPL_THREADSAFE_ISUPPORTS1(nsPostMessage, nsIRunnable)
//-----------------------------------------------------------------------------
// OutboundEnqueuer
//-----------------------------------------------------------------------------
class OutboundEnqueuer : public nsIRunnable
{
public:
NS_DECL_ISUPPORTS
OutboundEnqueuer(WebSocketChannel *aChannel, OutboundMessage *aMsg)
: mChannel(aChannel), mMessage(aMsg) {}
NS_IMETHOD Run()
{
mChannel->EnqueueOutgoingMessage(mChannel->mOutgoingMessages, mMessage);
return NS_OK;
}
private:
~OutboundEnqueuer() {}
nsRefPtr<WebSocketChannel> mChannel;
OutboundMessage *mMessage;
};
NS_IMPL_THREADSAFE_ISUPPORTS1(OutboundEnqueuer, nsIRunnable)
//-----------------------------------------------------------------------------
// nsWSAdmissionManager
//-----------------------------------------------------------------------------
// Section 5.1 requires that a client rate limit its connects to a single // Section 5.1 requires that a client rate limit its connects to a single
// TCP session in the CONNECTING state (i.e. anything before the 101 upgrade // TCP session in the CONNECTING state (i.e. anything before the 101 upgrade
@ -409,10 +533,14 @@ private:
PRInt32 mConnectedCount; PRInt32 mConnectedCount;
}; };
//-----------------------------------------------------------------------------
// nsWSCompression
//
// similar to nsDeflateConverter except for the mandatory FLUSH calls // similar to nsDeflateConverter except for the mandatory FLUSH calls
// required by websocket and the absence of the deflate termination // required by websocket and the absence of the deflate termination
// block which is appropriate because it would create data bytes after // block which is appropriate because it would create data bytes after
// sending the websockets CLOSE message. // sending the websockets CLOSE message.
//-----------------------------------------------------------------------------
class nsWSCompression class nsWSCompression
{ {
@ -534,7 +662,9 @@ private:
static nsWSAdmissionManager *sWebSocketAdmissions = nsnull; static nsWSAdmissionManager *sWebSocketAdmissions = nsnull;
//-----------------------------------------------------------------------------
// WebSocketChannel // WebSocketChannel
//-----------------------------------------------------------------------------
WebSocketChannel::WebSocketChannel() : WebSocketChannel::WebSocketChannel() :
mCloseTimeout(20000), mCloseTimeout(20000),
@ -1053,6 +1183,9 @@ WebSocketChannel::ProcessInput(PRUint8 *buffer, PRUint32 count)
void void
WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len) WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len)
{ {
if (!data || len == 0)
return;
// Optimally we want to apply the mask 32 bits at a time, // Optimally we want to apply the mask 32 bits at a time,
// but the buffer might not be alligned. So we first deal with // but the buffer might not be alligned. So we first deal with
// 0 to 3 bytes of preamble individually // 0 to 3 bytes of preamble individually
@ -1089,19 +1222,15 @@ WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len)
void void
WebSocketChannel::GeneratePing() WebSocketChannel::GeneratePing()
{ {
LOG(("WebSocketChannel::GeneratePing() %p\n", this));
nsCString *buf = new nsCString(); nsCString *buf = new nsCString();
buf->Assign("PING"); buf->Assign("PING");
mOutgoingPingMessages.Push(new OutboundMessage(buf)); EnqueueOutgoingMessage(mOutgoingPingMessages,
OnOutputStreamReady(mSocketOut); new OutboundMessage(kMsgTypePing, buf));
} }
void void
WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len) WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len)
{ {
LOG(("WebSocketChannel::GeneratePong() %p [%p %u]\n", this, payload, len));
nsCString *buf = new nsCString(); nsCString *buf = new nsCString();
buf->SetLength(len); buf->SetLength(len);
if (buf->Length() < len) { if (buf->Length() < len) {
@ -1111,27 +1240,25 @@ WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len)
} }
memcpy(buf->BeginWriting(), payload, len); memcpy(buf->BeginWriting(), payload, len);
mOutgoingPongMessages.Push(new OutboundMessage(buf)); EnqueueOutgoingMessage(mOutgoingPongMessages,
OnOutputStreamReady(mSocketOut); new OutboundMessage(kMsgTypePong, buf));
} }
void void
WebSocketChannel::SendMsgInternal(nsCString *aMsg, WebSocketChannel::EnqueueOutgoingMessage(nsDeque &aQueue,
PRInt32 aDataLen) OutboundMessage *aMsg)
{ {
LOG(("WebSocketChannel::SendMsgInternal %p [%p len=%d]\n", this, aMsg,
aDataLen));
NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread");
if (aMsg == kFinMessage) {
mOutgoingMessages.Push(new OutboundMessage()); LOG(("WebSocketChannel::EnqueueOutgoingMessage %p "
} else if (aDataLen < 0) { "queueing msg %p [type=%s len=%d]\n",
mOutgoingMessages.Push(new OutboundMessage(aMsg)); this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length()));
} else {
mOutgoingMessages.Push(new OutboundMessage(aMsg, aDataLen)); aQueue.Push(aMsg);
}
OnOutputStreamReady(mSocketOut); OnOutputStreamReady(mSocketOut);
} }
PRUint16 PRUint16
WebSocketChannel::ResultToCloseCode(nsresult resultCode) WebSocketChannel::ResultToCloseCode(nsresult resultCode)
{ {
@ -1155,34 +1282,42 @@ WebSocketChannel::PrimeNewOutgoingMessage()
NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread");
NS_ABORT_IF_FALSE(!mCurrentOut, "Current message in progress"); NS_ABORT_IF_FALSE(!mCurrentOut, "Current message in progress");
bool isPong = false; nsresult rv = NS_OK;
bool isPing = false;
mCurrentOut = (OutboundMessage *)mOutgoingPongMessages.PopFront(); mCurrentOut = (OutboundMessage *)mOutgoingPongMessages.PopFront();
if (mCurrentOut) { if (mCurrentOut) {
isPong = true; NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePong,
"Not pong message!");
} else { } else {
mCurrentOut = (OutboundMessage *)mOutgoingPingMessages.PopFront(); mCurrentOut = (OutboundMessage *)mOutgoingPingMessages.PopFront();
if (mCurrentOut) if (mCurrentOut)
isPing = true; NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePing,
"Not ping message!");
else else
mCurrentOut = (OutboundMessage *)mOutgoingMessages.PopFront(); mCurrentOut = (OutboundMessage *)mOutgoingMessages.PopFront();
} }
if (!mCurrentOut) if (!mCurrentOut)
return; return;
WsMsgType msgType = mCurrentOut->GetMsgType();
LOG(("WebSocketChannel::PrimeNewOutgoingMessage "
"%p found queued msg %p [type=%s len=%d]\n",
this, mCurrentOut, msgNames[msgType], mCurrentOut->Length()));
mCurrentOutSent = 0; mCurrentOutSent = 0;
mHdrOut = mOutHeader; mHdrOut = mOutHeader;
PRUint8 *payload = nsnull; PRUint8 *payload = nsnull;
if (mCurrentOut->IsControl() && !isPing && !isPong) {
if (msgType == kMsgTypeFin) {
// This is a demand to create a close message // This is a demand to create a close message
if (mClientClosed) { if (mClientClosed) {
PrimeNewOutgoingMessage(); PrimeNewOutgoingMessage();
return; return;
} }
LOG(("WebSocketChannel:: PrimeNewOutgoingMessage() found close request\n"));
mClientClosed = 1; mClientClosed = 1;
mOutHeader[0] = kFinalFragBit | kClose; mOutHeader[0] = kFinalFragBit | kClose;
mOutHeader[1] = 0x02; // payload len = 2, maybe more for reason mOutHeader[1] = 0x02; // payload len = 2, maybe more for reason
@ -1220,7 +1355,6 @@ WebSocketChannel::PrimeNewOutgoingMessage()
StopSession(mStopOnClose); StopSession(mStopOnClose);
} else { } else {
/* wait for reciprocal close from server */ /* wait for reciprocal close from server */
nsresult rv;
mCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); mCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv);
if (NS_SUCCEEDED(rv)) { if (NS_SUCCEEDED(rv)) {
mCloseTimer->InitWithCallback(this, mCloseTimeout, mCloseTimer->InitWithCallback(this, mCloseTimeout,
@ -1230,20 +1364,36 @@ WebSocketChannel::PrimeNewOutgoingMessage()
} }
} }
} else { } else {
if (isPong) { switch (msgType) {
LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found pong request\n")); case kMsgTypePong:
mOutHeader[0] = kFinalFragBit | kPong; mOutHeader[0] = kFinalFragBit | kPong;
} else if (isPing) { break;
LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found ping request\n")); case kMsgTypePing:
mOutHeader[0] = kFinalFragBit | kPing; mOutHeader[0] = kFinalFragBit | kPing;
} else if (mCurrentOut->BinaryLen() < 0) { break;
LOG(("WebSocketChannel::PrimeNewOutgoingMessage() " case kMsgTypeString:
"found queued text message len %d\n", mCurrentOut->Length()));
mOutHeader[0] = kFinalFragBit | kText; mOutHeader[0] = kFinalFragBit | kText;
} else { break;
LOG(("WebSocketChannel::PrimeNewOutgoingMessage() " case kMsgTypeStream:
"found queued binary message len %d\n", mCurrentOut->Length())); // HACK ALERT: read in entire stream into string.
// Will block socket transport thread if file is blocking.
// TODO: bug 704447: don't block socket thread!
rv = mCurrentOut->ConvertStreamToString();
if (NS_FAILED(rv)) {
AbortSession(rv);
return;
}
// Now we're a binary string
msgType = kMsgTypeBinaryString;
// no break: fall down into binary string case
case kMsgTypeBinaryString:
mOutHeader[0] = kFinalFragBit | kBinary; mOutHeader[0] = kFinalFragBit | kBinary;
break;
case kMsgTypeFin:
NS_ABORT_IF_FALSE(false, "unreachable"); // avoid compiler warning
break;
} }
if (mCurrentOut->Length() < 126) { if (mCurrentOut->Length() < 126) {
@ -1266,7 +1416,7 @@ WebSocketChannel::PrimeNewOutgoingMessage()
NS_ABORT_IF_FALSE(payload, "payload offset not found"); NS_ABORT_IF_FALSE(payload, "payload offset not found");
// Perfom the sending mask. never use a zero mask // Perform the sending mask. Never use a zero mask
PRUint32 mask; PRUint32 mask;
do { do {
PRUint8 *buffer; PRUint8 *buffer;
@ -1508,8 +1658,9 @@ WebSocketChannel::AbortSession(nsresult reason)
if (mTransport && reason != NS_BASE_STREAM_CLOSED && if (mTransport && reason != NS_BASE_STREAM_CLOSED &&
!mRequestedClose && !mClientClosed && !mServerClosed) { !mRequestedClose && !mClientClosed && !mServerClosed) {
mRequestedClose = 1; mRequestedClose = 1;
mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1), mSocketThread->Dispatch(
nsIEventTarget::DISPATCH_NORMAL); new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)),
nsIEventTarget::DISPATCH_NORMAL);
mStopOnClose = reason; mStopOnClose = reason;
} else { } else {
StopSession(reason); StopSession(reason);
@ -2120,57 +2271,64 @@ WebSocketChannel::Close(PRUint16 code, const nsACString & reason)
mRequestedClose = 1; mRequestedClose = 1;
mScriptCloseReason = reason; mScriptCloseReason = reason;
mScriptCloseCode = code; mScriptCloseCode = code;
return mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1), return mSocketThread->Dispatch(
nsIEventTarget::DISPATCH_NORMAL); new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)),
nsIEventTarget::DISPATCH_NORMAL);
} }
NS_IMETHODIMP NS_IMETHODIMP
WebSocketChannel::SendMsg(const nsACString &aMsg) WebSocketChannel::SendMsg(const nsACString &aMsg)
{ {
LOG(("WebSocketChannel::SendMsg() %p\n", this)); LOG(("WebSocketChannel::SendMsg() %p\n", this));
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
if (mRequestedClose) { return SendMsgCommon(&aMsg, false, aMsg.Length());
LOG(("WebSocketChannel:: SendMsg when closed error\n"));
return NS_ERROR_UNEXPECTED;
}
if (mStopped) {
LOG(("WebSocketChannel:: SendMsg when stopped error\n"));
return NS_ERROR_NOT_CONNECTED;
}
return mSocketThread->Dispatch(
new nsPostMessage(this, new nsCString(aMsg), -1),
nsIEventTarget::DISPATCH_NORMAL);
} }
NS_IMETHODIMP NS_IMETHODIMP
WebSocketChannel::SendBinaryMsg(const nsACString &aMsg) WebSocketChannel::SendBinaryMsg(const nsACString &aMsg)
{ {
LOG(("WebSocketChannel::SendBinaryMsg() %p len=%d\n", this, aMsg.Length())); LOG(("WebSocketChannel::SendBinaryMsg() %p len=%d\n", this, aMsg.Length()));
return SendMsgCommon(&aMsg, true, aMsg.Length());
}
NS_IMETHODIMP
WebSocketChannel::SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength)
{
LOG(("WebSocketChannel::SendBinaryStream() %p\n", this));
return SendMsgCommon(nsnull, true, aLength, aStream);
}
nsresult
WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary,
PRUint32 aLength, nsIInputStream *aStream)
{
NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread");
if (mRequestedClose) { if (mRequestedClose) {
LOG(("WebSocketChannel:: SendBinaryMsg when closed error\n")); LOG(("WebSocketChannel:: Error: send when closed\n"));
return NS_ERROR_UNEXPECTED; return NS_ERROR_UNEXPECTED;
} }
if (mStopped) { if (mStopped) {
LOG(("WebSocketChannel:: SendBinaryMsg when stopped error\n")); LOG(("WebSocketChannel:: Error: send when stopped\n"));
return NS_ERROR_NOT_CONNECTED; return NS_ERROR_NOT_CONNECTED;
} }
return mSocketThread->Dispatch(new nsPostMessage(this, new nsCString(aMsg), return mSocketThread->Dispatch(
aMsg.Length()), aStream ? new OutboundEnqueuer(this, new OutboundMessage(aStream, aLength))
nsIEventTarget::DISPATCH_NORMAL); : new OutboundEnqueuer(this,
new OutboundMessage(aIsBinary ? kMsgTypeBinaryString
: kMsgTypeString,
new nsCString(*aMsg))),
nsIEventTarget::DISPATCH_NORMAL);
} }
NS_IMETHODIMP NS_IMETHODIMP
WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport, WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport,
nsIAsyncInputStream *aSocketIn, nsIAsyncInputStream *aSocketIn,
nsIAsyncOutputStream *aSocketOut) nsIAsyncOutputStream *aSocketOut)
{ {
LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n", LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n",
this, aTransport, aSocketIn, aSocketOut, mRecvdHttpOnStartRequest)); this, aTransport, aSocketIn, aSocketOut, mRecvdHttpOnStartRequest));
@ -2201,7 +2359,7 @@ WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport,
NS_IMETHODIMP NS_IMETHODIMP
WebSocketChannel::OnStartRequest(nsIRequest *aRequest, WebSocketChannel::OnStartRequest(nsIRequest *aRequest,
nsISupports *aContext) nsISupports *aContext)
{ {
LOG(("WebSocketChannel::OnStartRequest(): %p [%p %p] recvdhttpupgrade=%d\n", LOG(("WebSocketChannel::OnStartRequest(): %p [%p %p] recvdhttpupgrade=%d\n",
this, aRequest, aContext, mRecvdHttpUpgradeTransport)); this, aRequest, aContext, mRecvdHttpUpgradeTransport));

Просмотреть файл

@ -67,7 +67,8 @@
namespace mozilla { namespace net { namespace mozilla { namespace net {
class nsPostMessage; class OutboundMessage;
class OutboundEnqueuer;
class nsWSAdmissionManager; class nsWSAdmissionManager;
class nsWSCompression; class nsWSCompression;
class CallOnMessageAvailable; class CallOnMessageAvailable;
@ -106,6 +107,7 @@ public:
NS_IMETHOD Close(PRUint16 aCode, const nsACString & aReason); NS_IMETHOD Close(PRUint16 aCode, const nsACString & aReason);
NS_IMETHOD SendMsg(const nsACString &aMsg); NS_IMETHOD SendMsg(const nsACString &aMsg);
NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); NS_IMETHOD SendBinaryMsg(const nsACString &aMsg);
NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 length);
NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo);
WebSocketChannel(); WebSocketChannel();
@ -131,14 +133,19 @@ protected:
virtual ~WebSocketChannel(); virtual ~WebSocketChannel();
private: private:
friend class nsPostMessage; friend class OutboundEnqueuer;
friend class nsWSAdmissionManager; friend class nsWSAdmissionManager;
friend class CallOnMessageAvailable; friend class CallOnMessageAvailable;
friend class CallOnStop; friend class CallOnStop;
friend class CallOnServerClose; friend class CallOnServerClose;
friend class CallAcknowledge; friend class CallAcknowledge;
void SendMsgInternal(nsCString *aMsg, PRInt32 datalen); // Common send code for binary + text msgs
nsresult SendMsgCommon(const nsACString *aMsg, bool isBinary,
PRUint32 length, nsIInputStream *aStream = NULL);
void EnqueueOutgoingMessage(nsDeque &aQueue, OutboundMessage *aMsg);
void PrimeNewOutgoingMessage(); void PrimeNewOutgoingMessage();
void GeneratePong(PRUint8 *payload, PRUint32 len); void GeneratePong(PRUint8 *payload, PRUint32 len);
void GeneratePing(); void GeneratePing();
@ -163,48 +170,6 @@ private:
PRUint32 UpdateReadBuffer(PRUint8 *buffer, PRUint32 count, PRUint32 UpdateReadBuffer(PRUint8 *buffer, PRUint32 count,
PRUint32 accumulatedFragments); PRUint32 accumulatedFragments);
class OutboundMessage
{
public:
OutboundMessage (nsCString *str)
: mMsg(str), mIsControl(false), mBinaryLen(-1)
{ MOZ_COUNT_CTOR(WebSocketOutboundMessage); }
OutboundMessage (nsCString *str, PRInt32 dataLen)
: mMsg(str), mIsControl(false), mBinaryLen(dataLen)
{ MOZ_COUNT_CTOR(WebSocketOutboundMessage); }
OutboundMessage ()
: mMsg(nsnull), mIsControl(true), mBinaryLen(-1)
{ MOZ_COUNT_CTOR(WebSocketOutboundMessage); }
~OutboundMessage()
{
MOZ_COUNT_DTOR(WebSocketOutboundMessage);
delete mMsg;
}
bool IsControl() { return mIsControl; }
const nsCString *Msg() { return mMsg; }
PRInt32 BinaryLen() { return mBinaryLen; }
PRInt32 Length()
{
if (mBinaryLen >= 0)
return mBinaryLen;
return mMsg ? mMsg->Length() : 0;
}
PRUint8 *BeginWriting() {
return (PRUint8 *)(mMsg ? mMsg->BeginWriting() : nsnull);
}
PRUint8 *BeginReading() {
return (PRUint8 *)(mMsg ? mMsg->BeginReading() : nsnull);
}
private:
nsCString *mMsg;
bool mIsControl;
PRInt32 mBinaryLen;
};
nsCOMPtr<nsIEventTarget> mSocketThread; nsCOMPtr<nsIEventTarget> mSocketThread;
nsCOMPtr<nsIHttpChannelInternal> mChannel; nsCOMPtr<nsIHttpChannelInternal> mChannel;
@ -234,7 +199,7 @@ private:
const static PRInt32 kLingeringCloseTimeout = 1000; const static PRInt32 kLingeringCloseTimeout = 1000;
const static PRInt32 kLingeringCloseThreshold = 50; const static PRInt32 kLingeringCloseThreshold = 50;
PRUint32 mMaxConcurrentConnections; PRInt32 mMaxConcurrentConnections;
PRUint32 mRecvdHttpOnStartRequest : 1; PRUint32 mRecvdHttpOnStartRequest : 1;
PRUint32 mRecvdHttpUpgradeTransport : 1; PRUint32 mRecvdHttpUpgradeTransport : 1;

Просмотреть файл

@ -405,6 +405,17 @@ WebSocketChannelChild::SendBinaryMsg(const nsACString &aMsg)
return NS_OK; return NS_OK;
} }
NS_IMETHODIMP
WebSocketChannelChild::SendBinaryStream(nsIInputStream *aStream,
PRUint32 aLength)
{
LOG(("WebSocketChannelChild::SendBinaryStream() %p\n", this));
if (!mIPCOpen || !SendSendBinaryStream(IPC::InputStream(aStream), aLength))
return NS_ERROR_UNEXPECTED;
return NS_OK;
}
NS_IMETHODIMP NS_IMETHODIMP
WebSocketChannelChild::GetSecurityInfo(nsISupports **aSecurityInfo) WebSocketChannelChild::GetSecurityInfo(nsISupports **aSecurityInfo)
{ {

Просмотреть файл

@ -60,14 +60,13 @@ class WebSocketChannelChild : public BaseWebSocketChannel,
// nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us // nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us
// //
NS_SCRIPTABLE NS_IMETHOD AsyncOpen(nsIURI *aURI, NS_IMETHOD AsyncOpen(nsIURI *aURI, const nsACString &aOrigin,
const nsACString &aOrigin, nsIWebSocketListener *aListener, nsISupports *aContext);
nsIWebSocketListener *aListener, NS_IMETHOD Close(PRUint16 code, const nsACString & reason);
nsISupports *aContext); NS_IMETHOD SendMsg(const nsACString &aMsg);
NS_SCRIPTABLE NS_IMETHOD Close(PRUint16 code, const nsACString & reason); NS_IMETHOD SendBinaryMsg(const nsACString &aMsg);
NS_SCRIPTABLE NS_IMETHOD SendMsg(const nsACString &aMsg); NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength);
NS_SCRIPTABLE NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo);
NS_SCRIPTABLE NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo);
void AddIPDLReference(); void AddIPDLReference();
void ReleaseIPDLReference(); void ReleaseIPDLReference();

Просмотреть файл

@ -137,6 +137,18 @@ WebSocketChannelParent::RecvSendBinaryMsg(const nsCString& aMsg)
return true; return true;
} }
bool
WebSocketChannelParent::RecvSendBinaryStream(const InputStream& aStream,
const PRUint32& aLength)
{
LOG(("WebSocketChannelParent::RecvSendBinaryStream() %p\n", this));
if (mChannel) {
nsresult rv = mChannel->SendBinaryStream(aStream, aLength);
NS_ENSURE_SUCCESS(rv, true);
}
return true;
}
NS_IMETHODIMP NS_IMETHODIMP
WebSocketChannelParent::GetInterface(const nsIID & iid, void **result NS_OUTPARAM) WebSocketChannelParent::GetInterface(const nsIID & iid, void **result NS_OUTPARAM)
{ {

Просмотреть файл

@ -70,6 +70,8 @@ class WebSocketChannelParent : public PWebSocketParent,
bool RecvClose(const PRUint16 & code, const nsCString & reason); bool RecvClose(const PRUint16 & code, const nsCString & reason);
bool RecvSendMsg(const nsCString& aMsg); bool RecvSendMsg(const nsCString& aMsg);
bool RecvSendBinaryMsg(const nsCString& aMsg); bool RecvSendBinaryMsg(const nsCString& aMsg);
bool RecvSendBinaryStream(const InputStream& aStream,
const PRUint32& aLength);
bool RecvDeleteSelf(); bool RecvDeleteSelf();
void ActorDestroy(ActorDestroyReason why); void ActorDestroy(ActorDestroyReason why);

Просмотреть файл

@ -41,10 +41,14 @@ interface nsIURI;
interface nsIInterfaceRequestor; interface nsIInterfaceRequestor;
interface nsILoadGroup; interface nsILoadGroup;
interface nsIWebSocketListener; interface nsIWebSocketListener;
interface nsIInputStream;
#include "nsISupports.idl" #include "nsISupports.idl"
[scriptable, uuid(e8ae0371-c28f-4d61-b257-514e014a4686)] /**
* You probably want nsI{Moz}WebSocket.idl
*/
[uuid(bb69e5d7-d9cd-4aab-9abe-98f80cf8b8b8)]
interface nsIWebSocketChannel : nsISupports interface nsIWebSocketChannel : nsISupports
{ {
/** /**
@ -140,4 +144,12 @@ interface nsIWebSocketChannel : nsISupports
* @param aMsg the data to send * @param aMsg the data to send
*/ */
void sendBinaryMsg(in ACString aMsg); void sendBinaryMsg(in ACString aMsg);
/**
* Use to send a binary stream (Blob) to Websocket peer.
*
* @param aStream The input stream to be sent.
*/
void sendBinaryStream(in nsIInputStream aStream,
in unsigned long length);
}; };