From 3757351c0795ccc2c65f82fddad7dc5330b7d9da Mon Sep 17 00:00:00 2001 From: Jason Duell Date: Thu, 15 Dec 2011 15:20:17 -0800 Subject: [PATCH] Bug 676439 - Websocket Binary Message support: Necko changes. r=mcmanus --- netwerk/protocol/websocket/PWebSocket.ipdl | 2 + .../protocol/websocket/WebSocketChannel.cpp | 340 +++++++++++++----- netwerk/protocol/websocket/WebSocketChannel.h | 57 +-- .../websocket/WebSocketChannelChild.cpp | 11 + .../websocket/WebSocketChannelChild.h | 15 +- .../websocket/WebSocketChannelParent.cpp | 12 + .../websocket/WebSocketChannelParent.h | 2 + .../websocket/nsIWebSocketChannel.idl | 14 +- 8 files changed, 307 insertions(+), 146 deletions(-) diff --git a/netwerk/protocol/websocket/PWebSocket.ipdl b/netwerk/protocol/websocket/PWebSocket.ipdl index e32cc17dc19..e9e92537cd2 100644 --- a/netwerk/protocol/websocket/PWebSocket.ipdl +++ b/netwerk/protocol/websocket/PWebSocket.ipdl @@ -44,6 +44,7 @@ include protocol PBrowser; include "mozilla/net/NeckoMessageUtils.h"; using IPC::URI; +using IPC::InputStream; namespace mozilla { namespace net { @@ -58,6 +59,7 @@ parent: Close(PRUint16 code, nsCString reason); SendMsg(nsCString aMsg); SendBinaryMsg(nsCString aMsg); + SendBinaryStream(InputStream aStream, PRUint32 aLength); DeleteSelf(); diff --git a/netwerk/protocol/websocket/WebSocketChannel.cpp b/netwerk/protocol/websocket/WebSocketChannel.cpp index 65a514c8d4b..ff571c35bbc 100644 --- a/netwerk/protocol/websocket/WebSocketChannel.cpp +++ b/netwerk/protocol/websocket/WebSocketChannel.cpp @@ -66,6 +66,7 @@ #include "nsStringStream.h" #include "nsAlgorithm.h" #include "nsProxyRelease.h" +#include "nsNetUtil.h" #include "plbase64.h" #include "prmem.h" @@ -91,10 +92,6 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocketChannel, nsIInterfaceRequestor, nsIChannelEventSink) -// Use this fake ptr so the Fin message stays in sequence in the -// main transmit queue -#define kFinMessage (reinterpret_cast(0x01)) - // An implementation of draft-ietf-hybi-thewebsocketprotocol-08 #define SEC_WEBSOCKET_VERSION "8" @@ -113,6 +110,10 @@ NS_IMPL_THREADSAFE_ISUPPORTS11(WebSocketChannel, // some helper classes +//----------------------------------------------------------------------------- +// CallOnMessageAvailable +//----------------------------------------------------------------------------- + class CallOnMessageAvailable : public nsIRunnable { public: @@ -125,7 +126,7 @@ public: mData(aData), mLen(aLen) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { if (mLen < 0) mChannel->mListener->OnMessageAvailable(mChannel->mContext, mData); @@ -143,6 +144,10 @@ private: }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnMessageAvailable, nsIRunnable) +//----------------------------------------------------------------------------- +// CallOnStop +//----------------------------------------------------------------------------- + class CallOnStop : public nsIRunnable { public: @@ -153,7 +158,7 @@ public: : mChannel(aChannel), mData(aData) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { mChannel->mListener->OnStop(mChannel->mContext, mData); return NS_OK; @@ -167,6 +172,10 @@ private: }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnStop, nsIRunnable) +//----------------------------------------------------------------------------- +// CallOnServerClose +//----------------------------------------------------------------------------- + class CallOnServerClose : public nsIRunnable { public: @@ -179,7 +188,7 @@ public: mCode(aCode), mReason(aReason) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { mChannel->mListener->OnServerClose(mChannel->mContext, mCode, mReason); return NS_OK; @@ -194,6 +203,10 @@ private: }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallOnServerClose, nsIRunnable) +//----------------------------------------------------------------------------- +// CallAcknowledge +//----------------------------------------------------------------------------- + class CallAcknowledge : public nsIRunnable { public: @@ -204,7 +217,7 @@ public: : mChannel(aChannel), mSize(aSize) {} - NS_SCRIPTABLE NS_IMETHOD Run() + NS_IMETHOD Run() { LOG(("WebSocketChannel::CallAcknowledge: Size %u\n", mSize)); mChannel->mListener->OnAcknowledge(mChannel->mContext, mSize); @@ -219,34 +232,145 @@ private: }; NS_IMPL_THREADSAFE_ISUPPORTS1(CallAcknowledge, nsIRunnable) -class nsPostMessage : public nsIRunnable +//----------------------------------------------------------------------------- +// OutboundMessage +//----------------------------------------------------------------------------- + +enum WsMsgType { + kMsgTypeString = 0, + kMsgTypeBinaryString, + kMsgTypeStream, + kMsgTypePing, + kMsgTypePong, + kMsgTypeFin +}; + +static const char* msgNames[] = { + "text", + "binaryString", + "binaryStream", + "ping", + "pong", + "close" +}; + +class OutboundMessage { public: - NS_DECL_ISUPPORTS - - nsPostMessage(WebSocketChannel *aChannel, - nsCString *aData, - PRInt32 aDataLen) - : mChannel(aChannel), - mData(aData), - mDataLen(aDataLen) {} - - NS_SCRIPTABLE NS_IMETHOD Run() + OutboundMessage(WsMsgType type, nsCString *str) + : mMsgType(type) { - if (mData) - mChannel->SendMsgInternal(mData, mDataLen); + MOZ_COUNT_CTOR(OutboundMessage); + mMsg.pString = str; + mLength = str ? str->Length() : 0; + } + + OutboundMessage(nsIInputStream *stream, PRUint32 length) + : mMsgType(kMsgTypeStream), mLength(length) + { + MOZ_COUNT_CTOR(OutboundMessage); + mMsg.pStream = stream; + mMsg.pStream->AddRef(); + } + + ~OutboundMessage() { + MOZ_COUNT_DTOR(OutboundMessage); + switch (mMsgType) { + case kMsgTypeString: + case kMsgTypeBinaryString: + case kMsgTypePing: + case kMsgTypePong: + delete mMsg.pString; + break; + case kMsgTypeStream: + // for now this only gets hit if msg deleted w/o being sent + if (mMsg.pStream) { + mMsg.pStream->Close(); + mMsg.pStream->Release(); + } + break; + case kMsgTypeFin: + break; // do-nothing: avoid compiler warning + } + } + + WsMsgType GetMsgType() const { return mMsgType; } + PRInt32 Length() const { return mLength; } + + PRUint8* BeginWriting() { + NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream, + "Stream should have been converted to string by now"); + return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginWriting() : nsnull); + } + + PRUint8* BeginReading() { + NS_ABORT_IF_FALSE(mMsgType != kMsgTypeStream, + "Stream should have been converted to string by now"); + return (PRUint8 *)(mMsg.pString ? mMsg.pString->BeginReading() : nsnull); + } + + nsresult ConvertStreamToString() + { + NS_ABORT_IF_FALSE(mMsgType == kMsgTypeStream, "Not a stream!"); + +#ifdef DEBUG + // Make sure we got correct length from Blob + PRUint32 bytes; + mMsg.pStream->Available(&bytes); + NS_ASSERTION(bytes == mLength, "Stream length != blob length!"); +#endif + + nsAutoPtr temp(new nsCString()); + nsresult rv = NS_ReadInputStreamToString(mMsg.pStream, *temp, mLength); + + NS_ENSURE_SUCCESS(rv, rv); + + mMsg.pStream->Close(); + mMsg.pStream->Release(); + mMsg.pString = temp.forget(); + mMsgType = kMsgTypeBinaryString; + return NS_OK; } private: - ~nsPostMessage() {} - - nsRefPtr mChannel; - nsCString *mData; - PRInt32 mDataLen; + union { + nsCString *pString; + nsIInputStream *pStream; + } mMsg; + WsMsgType mMsgType; + PRUint32 mLength; }; -NS_IMPL_THREADSAFE_ISUPPORTS1(nsPostMessage, nsIRunnable) +//----------------------------------------------------------------------------- +// OutboundEnqueuer +//----------------------------------------------------------------------------- + +class OutboundEnqueuer : public nsIRunnable +{ +public: + NS_DECL_ISUPPORTS + + OutboundEnqueuer(WebSocketChannel *aChannel, OutboundMessage *aMsg) + : mChannel(aChannel), mMessage(aMsg) {} + + NS_IMETHOD Run() + { + mChannel->EnqueueOutgoingMessage(mChannel->mOutgoingMessages, mMessage); + return NS_OK; + } + +private: + ~OutboundEnqueuer() {} + + nsRefPtr mChannel; + OutboundMessage *mMessage; +}; +NS_IMPL_THREADSAFE_ISUPPORTS1(OutboundEnqueuer, nsIRunnable) + +//----------------------------------------------------------------------------- +// nsWSAdmissionManager +//----------------------------------------------------------------------------- // Section 5.1 requires that a client rate limit its connects to a single // TCP session in the CONNECTING state (i.e. anything before the 101 upgrade @@ -409,10 +533,14 @@ private: PRInt32 mConnectedCount; }; +//----------------------------------------------------------------------------- +// nsWSCompression +// // similar to nsDeflateConverter except for the mandatory FLUSH calls // required by websocket and the absence of the deflate termination // block which is appropriate because it would create data bytes after // sending the websockets CLOSE message. +//----------------------------------------------------------------------------- class nsWSCompression { @@ -534,7 +662,9 @@ private: static nsWSAdmissionManager *sWebSocketAdmissions = nsnull; +//----------------------------------------------------------------------------- // WebSocketChannel +//----------------------------------------------------------------------------- WebSocketChannel::WebSocketChannel() : mCloseTimeout(20000), @@ -1053,6 +1183,9 @@ WebSocketChannel::ProcessInput(PRUint8 *buffer, PRUint32 count) void WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len) { + if (!data || len == 0) + return; + // Optimally we want to apply the mask 32 bits at a time, // but the buffer might not be alligned. So we first deal with // 0 to 3 bytes of preamble individually @@ -1089,19 +1222,15 @@ WebSocketChannel::ApplyMask(PRUint32 mask, PRUint8 *data, PRUint64 len) void WebSocketChannel::GeneratePing() { - LOG(("WebSocketChannel::GeneratePing() %p\n", this)); - nsCString *buf = new nsCString(); buf->Assign("PING"); - mOutgoingPingMessages.Push(new OutboundMessage(buf)); - OnOutputStreamReady(mSocketOut); + EnqueueOutgoingMessage(mOutgoingPingMessages, + new OutboundMessage(kMsgTypePing, buf)); } void WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len) { - LOG(("WebSocketChannel::GeneratePong() %p [%p %u]\n", this, payload, len)); - nsCString *buf = new nsCString(); buf->SetLength(len); if (buf->Length() < len) { @@ -1111,27 +1240,25 @@ WebSocketChannel::GeneratePong(PRUint8 *payload, PRUint32 len) } memcpy(buf->BeginWriting(), payload, len); - mOutgoingPongMessages.Push(new OutboundMessage(buf)); - OnOutputStreamReady(mSocketOut); + EnqueueOutgoingMessage(mOutgoingPongMessages, + new OutboundMessage(kMsgTypePong, buf)); } void -WebSocketChannel::SendMsgInternal(nsCString *aMsg, - PRInt32 aDataLen) +WebSocketChannel::EnqueueOutgoingMessage(nsDeque &aQueue, + OutboundMessage *aMsg) { - LOG(("WebSocketChannel::SendMsgInternal %p [%p len=%d]\n", this, aMsg, - aDataLen)); NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); - if (aMsg == kFinMessage) { - mOutgoingMessages.Push(new OutboundMessage()); - } else if (aDataLen < 0) { - mOutgoingMessages.Push(new OutboundMessage(aMsg)); - } else { - mOutgoingMessages.Push(new OutboundMessage(aMsg, aDataLen)); - } + + LOG(("WebSocketChannel::EnqueueOutgoingMessage %p " + "queueing msg %p [type=%s len=%d]\n", + this, aMsg, msgNames[aMsg->GetMsgType()], aMsg->Length())); + + aQueue.Push(aMsg); OnOutputStreamReady(mSocketOut); } + PRUint16 WebSocketChannel::ResultToCloseCode(nsresult resultCode) { @@ -1155,34 +1282,42 @@ WebSocketChannel::PrimeNewOutgoingMessage() NS_ABORT_IF_FALSE(PR_GetCurrentThread() == gSocketThread, "not socket thread"); NS_ABORT_IF_FALSE(!mCurrentOut, "Current message in progress"); - bool isPong = false; - bool isPing = false; + nsresult rv = NS_OK; mCurrentOut = (OutboundMessage *)mOutgoingPongMessages.PopFront(); if (mCurrentOut) { - isPong = true; + NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePong, + "Not pong message!"); } else { mCurrentOut = (OutboundMessage *)mOutgoingPingMessages.PopFront(); if (mCurrentOut) - isPing = true; + NS_ABORT_IF_FALSE(mCurrentOut->GetMsgType() == kMsgTypePing, + "Not ping message!"); else mCurrentOut = (OutboundMessage *)mOutgoingMessages.PopFront(); } if (!mCurrentOut) return; + + WsMsgType msgType = mCurrentOut->GetMsgType(); + + LOG(("WebSocketChannel::PrimeNewOutgoingMessage " + "%p found queued msg %p [type=%s len=%d]\n", + this, mCurrentOut, msgNames[msgType], mCurrentOut->Length())); + mCurrentOutSent = 0; mHdrOut = mOutHeader; PRUint8 *payload = nsnull; - if (mCurrentOut->IsControl() && !isPing && !isPong) { + + if (msgType == kMsgTypeFin) { // This is a demand to create a close message if (mClientClosed) { PrimeNewOutgoingMessage(); return; } - LOG(("WebSocketChannel:: PrimeNewOutgoingMessage() found close request\n")); mClientClosed = 1; mOutHeader[0] = kFinalFragBit | kClose; mOutHeader[1] = 0x02; // payload len = 2, maybe more for reason @@ -1220,7 +1355,6 @@ WebSocketChannel::PrimeNewOutgoingMessage() StopSession(mStopOnClose); } else { /* wait for reciprocal close from server */ - nsresult rv; mCloseTimer = do_CreateInstance("@mozilla.org/timer;1", &rv); if (NS_SUCCEEDED(rv)) { mCloseTimer->InitWithCallback(this, mCloseTimeout, @@ -1230,20 +1364,36 @@ WebSocketChannel::PrimeNewOutgoingMessage() } } } else { - if (isPong) { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found pong request\n")); + switch (msgType) { + case kMsgTypePong: mOutHeader[0] = kFinalFragBit | kPong; - } else if (isPing) { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() found ping request\n")); + break; + case kMsgTypePing: mOutHeader[0] = kFinalFragBit | kPing; - } else if (mCurrentOut->BinaryLen() < 0) { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() " - "found queued text message len %d\n", mCurrentOut->Length())); + break; + case kMsgTypeString: mOutHeader[0] = kFinalFragBit | kText; - } else { - LOG(("WebSocketChannel::PrimeNewOutgoingMessage() " - "found queued binary message len %d\n", mCurrentOut->Length())); + break; + case kMsgTypeStream: + // HACK ALERT: read in entire stream into string. + // Will block socket transport thread if file is blocking. + // TODO: bug 704447: don't block socket thread! + rv = mCurrentOut->ConvertStreamToString(); + if (NS_FAILED(rv)) { + AbortSession(rv); + return; + } + // Now we're a binary string + msgType = kMsgTypeBinaryString; + + // no break: fall down into binary string case + + case kMsgTypeBinaryString: mOutHeader[0] = kFinalFragBit | kBinary; + break; + case kMsgTypeFin: + NS_ABORT_IF_FALSE(false, "unreachable"); // avoid compiler warning + break; } if (mCurrentOut->Length() < 126) { @@ -1266,7 +1416,7 @@ WebSocketChannel::PrimeNewOutgoingMessage() NS_ABORT_IF_FALSE(payload, "payload offset not found"); - // Perfom the sending mask. never use a zero mask + // Perform the sending mask. Never use a zero mask PRUint32 mask; do { PRUint8 *buffer; @@ -1508,8 +1658,9 @@ WebSocketChannel::AbortSession(nsresult reason) if (mTransport && reason != NS_BASE_STREAM_CLOSED && !mRequestedClose && !mClientClosed && !mServerClosed) { mRequestedClose = 1; - mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1), - nsIEventTarget::DISPATCH_NORMAL); + mSocketThread->Dispatch( + new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)), + nsIEventTarget::DISPATCH_NORMAL); mStopOnClose = reason; } else { StopSession(reason); @@ -2120,57 +2271,64 @@ WebSocketChannel::Close(PRUint16 code, const nsACString & reason) mRequestedClose = 1; mScriptCloseReason = reason; mScriptCloseCode = code; - - return mSocketThread->Dispatch(new nsPostMessage(this, kFinMessage, -1), - nsIEventTarget::DISPATCH_NORMAL); + + return mSocketThread->Dispatch( + new OutboundEnqueuer(this, new OutboundMessage(kMsgTypeFin, nsnull)), + nsIEventTarget::DISPATCH_NORMAL); } NS_IMETHODIMP WebSocketChannel::SendMsg(const nsACString &aMsg) { LOG(("WebSocketChannel::SendMsg() %p\n", this)); - NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); - if (mRequestedClose) { - LOG(("WebSocketChannel:: SendMsg when closed error\n")); - return NS_ERROR_UNEXPECTED; - } - - if (mStopped) { - LOG(("WebSocketChannel:: SendMsg when stopped error\n")); - return NS_ERROR_NOT_CONNECTED; - } - - return mSocketThread->Dispatch( - new nsPostMessage(this, new nsCString(aMsg), -1), - nsIEventTarget::DISPATCH_NORMAL); + return SendMsgCommon(&aMsg, false, aMsg.Length()); } NS_IMETHODIMP WebSocketChannel::SendBinaryMsg(const nsACString &aMsg) { LOG(("WebSocketChannel::SendBinaryMsg() %p len=%d\n", this, aMsg.Length())); + return SendMsgCommon(&aMsg, true, aMsg.Length()); +} + +NS_IMETHODIMP +WebSocketChannel::SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength) +{ + LOG(("WebSocketChannel::SendBinaryStream() %p\n", this)); + + return SendMsgCommon(nsnull, true, aLength, aStream); +} + +nsresult +WebSocketChannel::SendMsgCommon(const nsACString *aMsg, bool aIsBinary, + PRUint32 aLength, nsIInputStream *aStream) +{ NS_ABORT_IF_FALSE(NS_IsMainThread(), "not main thread"); if (mRequestedClose) { - LOG(("WebSocketChannel:: SendBinaryMsg when closed error\n")); + LOG(("WebSocketChannel:: Error: send when closed\n")); return NS_ERROR_UNEXPECTED; } if (mStopped) { - LOG(("WebSocketChannel:: SendBinaryMsg when stopped error\n")); + LOG(("WebSocketChannel:: Error: send when stopped\n")); return NS_ERROR_NOT_CONNECTED; } - return mSocketThread->Dispatch(new nsPostMessage(this, new nsCString(aMsg), - aMsg.Length()), - nsIEventTarget::DISPATCH_NORMAL); + return mSocketThread->Dispatch( + aStream ? new OutboundEnqueuer(this, new OutboundMessage(aStream, aLength)) + : new OutboundEnqueuer(this, + new OutboundMessage(aIsBinary ? kMsgTypeBinaryString + : kMsgTypeString, + new nsCString(*aMsg))), + nsIEventTarget::DISPATCH_NORMAL); } NS_IMETHODIMP WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport, - nsIAsyncInputStream *aSocketIn, - nsIAsyncOutputStream *aSocketOut) + nsIAsyncInputStream *aSocketIn, + nsIAsyncOutputStream *aSocketOut) { LOG(("WebSocketChannel::OnTransportAvailable %p [%p %p %p] rcvdonstart=%d\n", this, aTransport, aSocketIn, aSocketOut, mRecvdHttpOnStartRequest)); @@ -2201,7 +2359,7 @@ WebSocketChannel::OnTransportAvailable(nsISocketTransport *aTransport, NS_IMETHODIMP WebSocketChannel::OnStartRequest(nsIRequest *aRequest, - nsISupports *aContext) + nsISupports *aContext) { LOG(("WebSocketChannel::OnStartRequest(): %p [%p %p] recvdhttpupgrade=%d\n", this, aRequest, aContext, mRecvdHttpUpgradeTransport)); diff --git a/netwerk/protocol/websocket/WebSocketChannel.h b/netwerk/protocol/websocket/WebSocketChannel.h index c00ebef9d99..ed469dd7d95 100644 --- a/netwerk/protocol/websocket/WebSocketChannel.h +++ b/netwerk/protocol/websocket/WebSocketChannel.h @@ -67,7 +67,8 @@ namespace mozilla { namespace net { -class nsPostMessage; +class OutboundMessage; +class OutboundEnqueuer; class nsWSAdmissionManager; class nsWSCompression; class CallOnMessageAvailable; @@ -106,6 +107,7 @@ public: NS_IMETHOD Close(PRUint16 aCode, const nsACString & aReason); NS_IMETHOD SendMsg(const nsACString &aMsg); NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); + NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 length); NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); WebSocketChannel(); @@ -131,14 +133,19 @@ protected: virtual ~WebSocketChannel(); private: - friend class nsPostMessage; + friend class OutboundEnqueuer; friend class nsWSAdmissionManager; friend class CallOnMessageAvailable; friend class CallOnStop; friend class CallOnServerClose; friend class CallAcknowledge; - void SendMsgInternal(nsCString *aMsg, PRInt32 datalen); + // Common send code for binary + text msgs + nsresult SendMsgCommon(const nsACString *aMsg, bool isBinary, + PRUint32 length, nsIInputStream *aStream = NULL); + + void EnqueueOutgoingMessage(nsDeque &aQueue, OutboundMessage *aMsg); + void PrimeNewOutgoingMessage(); void GeneratePong(PRUint8 *payload, PRUint32 len); void GeneratePing(); @@ -163,48 +170,6 @@ private: PRUint32 UpdateReadBuffer(PRUint8 *buffer, PRUint32 count, PRUint32 accumulatedFragments); - class OutboundMessage - { - public: - OutboundMessage (nsCString *str) - : mMsg(str), mIsControl(false), mBinaryLen(-1) - { MOZ_COUNT_CTOR(WebSocketOutboundMessage); } - - OutboundMessage (nsCString *str, PRInt32 dataLen) - : mMsg(str), mIsControl(false), mBinaryLen(dataLen) - { MOZ_COUNT_CTOR(WebSocketOutboundMessage); } - - OutboundMessage () - : mMsg(nsnull), mIsControl(true), mBinaryLen(-1) - { MOZ_COUNT_CTOR(WebSocketOutboundMessage); } - - ~OutboundMessage() - { - MOZ_COUNT_DTOR(WebSocketOutboundMessage); - delete mMsg; - } - - bool IsControl() { return mIsControl; } - const nsCString *Msg() { return mMsg; } - PRInt32 BinaryLen() { return mBinaryLen; } - PRInt32 Length() - { - if (mBinaryLen >= 0) - return mBinaryLen; - return mMsg ? mMsg->Length() : 0; - } - PRUint8 *BeginWriting() { - return (PRUint8 *)(mMsg ? mMsg->BeginWriting() : nsnull); - } - PRUint8 *BeginReading() { - return (PRUint8 *)(mMsg ? mMsg->BeginReading() : nsnull); - } - - private: - nsCString *mMsg; - bool mIsControl; - PRInt32 mBinaryLen; - }; nsCOMPtr mSocketThread; nsCOMPtr mChannel; @@ -234,7 +199,7 @@ private: const static PRInt32 kLingeringCloseTimeout = 1000; const static PRInt32 kLingeringCloseThreshold = 50; - PRUint32 mMaxConcurrentConnections; + PRInt32 mMaxConcurrentConnections; PRUint32 mRecvdHttpOnStartRequest : 1; PRUint32 mRecvdHttpUpgradeTransport : 1; diff --git a/netwerk/protocol/websocket/WebSocketChannelChild.cpp b/netwerk/protocol/websocket/WebSocketChannelChild.cpp index 20ff442e760..94800a837a0 100644 --- a/netwerk/protocol/websocket/WebSocketChannelChild.cpp +++ b/netwerk/protocol/websocket/WebSocketChannelChild.cpp @@ -405,6 +405,17 @@ WebSocketChannelChild::SendBinaryMsg(const nsACString &aMsg) return NS_OK; } +NS_IMETHODIMP +WebSocketChannelChild::SendBinaryStream(nsIInputStream *aStream, + PRUint32 aLength) +{ + LOG(("WebSocketChannelChild::SendBinaryStream() %p\n", this)); + + if (!mIPCOpen || !SendSendBinaryStream(IPC::InputStream(aStream), aLength)) + return NS_ERROR_UNEXPECTED; + return NS_OK; +} + NS_IMETHODIMP WebSocketChannelChild::GetSecurityInfo(nsISupports **aSecurityInfo) { diff --git a/netwerk/protocol/websocket/WebSocketChannelChild.h b/netwerk/protocol/websocket/WebSocketChannelChild.h index 3af6231f853..884a7a9874b 100644 --- a/netwerk/protocol/websocket/WebSocketChannelChild.h +++ b/netwerk/protocol/websocket/WebSocketChannelChild.h @@ -60,14 +60,13 @@ class WebSocketChannelChild : public BaseWebSocketChannel, // nsIWebSocketChannel methods BaseWebSocketChannel didn't implement for us // - NS_SCRIPTABLE NS_IMETHOD AsyncOpen(nsIURI *aURI, - const nsACString &aOrigin, - nsIWebSocketListener *aListener, - nsISupports *aContext); - NS_SCRIPTABLE NS_IMETHOD Close(PRUint16 code, const nsACString & reason); - NS_SCRIPTABLE NS_IMETHOD SendMsg(const nsACString &aMsg); - NS_SCRIPTABLE NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); - NS_SCRIPTABLE NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); + NS_IMETHOD AsyncOpen(nsIURI *aURI, const nsACString &aOrigin, + nsIWebSocketListener *aListener, nsISupports *aContext); + NS_IMETHOD Close(PRUint16 code, const nsACString & reason); + NS_IMETHOD SendMsg(const nsACString &aMsg); + NS_IMETHOD SendBinaryMsg(const nsACString &aMsg); + NS_IMETHOD SendBinaryStream(nsIInputStream *aStream, PRUint32 aLength); + NS_IMETHOD GetSecurityInfo(nsISupports **aSecurityInfo); void AddIPDLReference(); void ReleaseIPDLReference(); diff --git a/netwerk/protocol/websocket/WebSocketChannelParent.cpp b/netwerk/protocol/websocket/WebSocketChannelParent.cpp index bd14d4567df..d543f233fb7 100644 --- a/netwerk/protocol/websocket/WebSocketChannelParent.cpp +++ b/netwerk/protocol/websocket/WebSocketChannelParent.cpp @@ -137,6 +137,18 @@ WebSocketChannelParent::RecvSendBinaryMsg(const nsCString& aMsg) return true; } +bool +WebSocketChannelParent::RecvSendBinaryStream(const InputStream& aStream, + const PRUint32& aLength) +{ + LOG(("WebSocketChannelParent::RecvSendBinaryStream() %p\n", this)); + if (mChannel) { + nsresult rv = mChannel->SendBinaryStream(aStream, aLength); + NS_ENSURE_SUCCESS(rv, true); + } + return true; +} + NS_IMETHODIMP WebSocketChannelParent::GetInterface(const nsIID & iid, void **result NS_OUTPARAM) { diff --git a/netwerk/protocol/websocket/WebSocketChannelParent.h b/netwerk/protocol/websocket/WebSocketChannelParent.h index 7364e736ff2..e115d5b654b 100644 --- a/netwerk/protocol/websocket/WebSocketChannelParent.h +++ b/netwerk/protocol/websocket/WebSocketChannelParent.h @@ -70,6 +70,8 @@ class WebSocketChannelParent : public PWebSocketParent, bool RecvClose(const PRUint16 & code, const nsCString & reason); bool RecvSendMsg(const nsCString& aMsg); bool RecvSendBinaryMsg(const nsCString& aMsg); + bool RecvSendBinaryStream(const InputStream& aStream, + const PRUint32& aLength); bool RecvDeleteSelf(); void ActorDestroy(ActorDestroyReason why); diff --git a/netwerk/protocol/websocket/nsIWebSocketChannel.idl b/netwerk/protocol/websocket/nsIWebSocketChannel.idl index 978f138ace3..03f5a636a96 100644 --- a/netwerk/protocol/websocket/nsIWebSocketChannel.idl +++ b/netwerk/protocol/websocket/nsIWebSocketChannel.idl @@ -41,10 +41,14 @@ interface nsIURI; interface nsIInterfaceRequestor; interface nsILoadGroup; interface nsIWebSocketListener; +interface nsIInputStream; #include "nsISupports.idl" -[scriptable, uuid(e8ae0371-c28f-4d61-b257-514e014a4686)] +/** + * You probably want nsI{Moz}WebSocket.idl + */ +[uuid(bb69e5d7-d9cd-4aab-9abe-98f80cf8b8b8)] interface nsIWebSocketChannel : nsISupports { /** @@ -140,4 +144,12 @@ interface nsIWebSocketChannel : nsISupports * @param aMsg the data to send */ void sendBinaryMsg(in ACString aMsg); + + /** + * Use to send a binary stream (Blob) to Websocket peer. + * + * @param aStream The input stream to be sent. + */ + void sendBinaryStream(in nsIInputStream aStream, + in unsigned long length); };