зеркало из https://github.com/mozilla/gecko-dev.git
3378 строки
112 KiB
C++
3378 строки
112 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
* You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include <algorithm>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#if !defined(__Userspace_os_Windows)
|
|
# include <arpa/inet.h>
|
|
#endif
|
|
// usrsctp.h expects to have errno definitions prior to its inclusion.
|
|
#include <errno.h>
|
|
|
|
#define SCTP_DEBUG 1
|
|
#define SCTP_STDINT_INCLUDE <stdint.h>
|
|
|
|
#ifdef _MSC_VER
|
|
// Disable "warning C4200: nonstandard extension used : zero-sized array in
|
|
// struct/union"
|
|
// ...which the third-party file usrsctp.h runs afoul of.
|
|
# pragma warning(push)
|
|
# pragma warning(disable : 4200)
|
|
#endif
|
|
|
|
#include "usrsctp.h"
|
|
|
|
#ifdef _MSC_VER
|
|
# pragma warning(pop)
|
|
#endif
|
|
|
|
#include "nsServiceManagerUtils.h"
|
|
#include "nsIInputStream.h"
|
|
#include "nsIPrefBranch.h"
|
|
#include "nsIPrefService.h"
|
|
#include "mozilla/Services.h"
|
|
#include "mozilla/Sprintf.h"
|
|
#include "nsProxyRelease.h"
|
|
#include "nsThread.h"
|
|
#include "nsThreadUtils.h"
|
|
#include "nsNetUtil.h"
|
|
#include "nsNetCID.h"
|
|
#include "mozilla/RandomNum.h"
|
|
#include "mozilla/StaticMutex.h"
|
|
#include "mozilla/UniquePtrExtensions.h"
|
|
#include "mozilla/Unused.h"
|
|
#include "mozilla/dom/RTCDataChannelBinding.h"
|
|
#include "mozilla/dom/RTCStatsReportBinding.h"
|
|
#include "mozilla/media/MediaUtils.h"
|
|
#ifdef MOZ_PEERCONNECTION
|
|
# include "transport/runnable_utils.h"
|
|
# include "jsapi/MediaTransportHandler.h"
|
|
# include "mediapacket.h"
|
|
#endif
|
|
|
|
#include "DataChannel.h"
|
|
#include "DataChannelProtocol.h"
|
|
|
|
// Let us turn on and off important assertions in non-debug builds
|
|
#ifdef DEBUG
|
|
# define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
|
|
#elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
|
|
# define ASSERT_WEBRTC(x) \
|
|
do { \
|
|
if (!(x)) { \
|
|
MOZ_CRASH(); \
|
|
} \
|
|
} while (0)
|
|
#endif
|
|
|
|
namespace mozilla {
|
|
|
|
LazyLogModule gDataChannelLog("DataChannel");
|
|
static LazyLogModule gSCTPLog("SCTP");
|
|
|
|
#define SCTP_LOG(args) \
|
|
MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
|
|
|
|
static void debug_printf(const char* format, ...) {
|
|
va_list ap;
|
|
char buffer[1024];
|
|
|
|
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
|
|
va_start(ap, format);
|
|
#ifdef _WIN32
|
|
if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
|
|
#else
|
|
if (VsprintfLiteral(buffer, format, ap) > 0) {
|
|
#endif
|
|
SCTP_LOG(("%s", buffer));
|
|
}
|
|
va_end(ap);
|
|
}
|
|
}
|
|
|
|
class DataChannelRegistry {
|
|
public:
|
|
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(DataChannelRegistry)
|
|
|
|
static uintptr_t Register(DataChannelConnection* aConnection) {
|
|
StaticMutexAutoLock lock(sInstanceMutex);
|
|
uintptr_t result = EnsureInstance()->RegisterImpl(aConnection);
|
|
DC_DEBUG(
|
|
("Registering connection %p as ulp %p", aConnection, (void*)result));
|
|
return result;
|
|
}
|
|
|
|
static void Deregister(uintptr_t aId) {
|
|
RefPtr<DataChannelRegistry> maybeTrash;
|
|
|
|
{
|
|
StaticMutexAutoLock lock(sInstanceMutex);
|
|
DC_DEBUG(("Deregistering connection ulp = %p", (void*)aId));
|
|
if (NS_WARN_IF(!Instance())) {
|
|
return;
|
|
}
|
|
Instance()->DeregisterImpl(aId);
|
|
if (Instance()->Empty()) {
|
|
// Unset singleton inside mutex lock, but don't call Shutdown until we
|
|
// unlock, since that involves calling into libusrsctp, which invites
|
|
// deadlock.
|
|
maybeTrash = Instance().forget();
|
|
}
|
|
}
|
|
}
|
|
|
|
static RefPtr<DataChannelConnection> Lookup(uintptr_t aId) {
|
|
StaticMutexAutoLock lock(sInstanceMutex);
|
|
if (NS_WARN_IF(!Instance())) {
|
|
return nullptr;
|
|
}
|
|
return Instance()->LookupImpl(aId);
|
|
}
|
|
|
|
private:
|
|
// This is a singleton class, so don't let just anyone create one of these
|
|
DataChannelRegistry() {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
mShutdownBlocker = media::ShutdownBlockingTicket::Create(
|
|
u"DataChannelRegistry::mShutdownBlocker"_ns,
|
|
NS_LITERAL_STRING_FROM_CSTRING(__FILE__), __LINE__);
|
|
InitUsrSctp();
|
|
}
|
|
|
|
static RefPtr<DataChannelRegistry>& Instance() {
|
|
static RefPtr<DataChannelRegistry> sRegistry;
|
|
return sRegistry;
|
|
}
|
|
|
|
static RefPtr<DataChannelRegistry>& EnsureInstance() {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
if (!Instance()) {
|
|
Instance() = new DataChannelRegistry();
|
|
}
|
|
return Instance();
|
|
}
|
|
|
|
uintptr_t RegisterImpl(DataChannelConnection* aConnection) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
mConnections.emplace(mNextId, aConnection);
|
|
return mNextId++;
|
|
}
|
|
|
|
void DeregisterImpl(uintptr_t aId) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
mConnections.erase(aId);
|
|
}
|
|
|
|
bool Empty() const { return mConnections.empty(); }
|
|
|
|
RefPtr<DataChannelConnection> LookupImpl(uintptr_t aId) {
|
|
auto it = mConnections.find(aId);
|
|
if (NS_WARN_IF(it == mConnections.end())) {
|
|
DC_DEBUG(("Can't find connection ulp %p", (void*)aId));
|
|
return nullptr;
|
|
}
|
|
return it->second;
|
|
}
|
|
|
|
virtual ~DataChannelRegistry() {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
|
|
if (NS_WARN_IF(!mConnections.empty())) {
|
|
MOZ_ASSERT(false);
|
|
mConnections.clear();
|
|
}
|
|
|
|
DeinitUsrSctp();
|
|
}
|
|
|
|
#ifdef SCTP_DTLS_SUPPORTED
|
|
static int SctpDtlsOutput(void* addr, void* buffer, size_t length,
|
|
uint8_t tos, uint8_t set_df) {
|
|
uintptr_t id = reinterpret_cast<uintptr_t>(addr);
|
|
RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id);
|
|
if (NS_WARN_IF(!connection) || connection->InShutdown()) {
|
|
return 0;
|
|
}
|
|
return connection->SctpDtlsOutput(addr, buffer, length, tos, set_df);
|
|
}
|
|
#endif
|
|
|
|
void InitUsrSctp() {
|
|
#ifndef MOZ_PEERCONNECTION
|
|
MOZ_CRASH("Trying to use SCTP/DTLS without dom/media/webrtc/transport");
|
|
#endif
|
|
|
|
DC_DEBUG(("Calling usrsctp_init %p", this));
|
|
|
|
usrsctp_init(0, DataChannelRegistry::SctpDtlsOutput, debug_printf);
|
|
|
|
// Set logging to SCTP:LogLevel::Debug to get SCTP debugs
|
|
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
|
|
usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
|
|
}
|
|
|
|
// Do not send ABORTs in response to INITs (1).
|
|
// Do not send ABORTs for received Out of the Blue packets (2).
|
|
usrsctp_sysctl_set_sctp_blackhole(2);
|
|
|
|
// Disable the Explicit Congestion Notification extension (currently not
|
|
// supported by the Firefox code)
|
|
usrsctp_sysctl_set_sctp_ecn_enable(0);
|
|
|
|
// Enable interleaving messages for different streams (incoming)
|
|
// See: https://tools.ietf.org/html/rfc6458#section-8.1.20
|
|
usrsctp_sysctl_set_sctp_default_frag_interleave(2);
|
|
|
|
// Disabling authentication and dynamic address reconfiguration as neither
|
|
// of them are used for data channel and only result in additional code
|
|
// paths being used.
|
|
usrsctp_sysctl_set_sctp_asconf_enable(0);
|
|
usrsctp_sysctl_set_sctp_auth_enable(0);
|
|
}
|
|
|
|
void DeinitUsrSctp() {
|
|
DC_DEBUG(("Calling usrsctp_finish %p", this));
|
|
usrsctp_finish();
|
|
}
|
|
|
|
uintptr_t mNextId = 1;
|
|
std::map<uintptr_t, RefPtr<DataChannelConnection>> mConnections;
|
|
UniquePtr<media::ShutdownBlockingTicket> mShutdownBlocker;
|
|
static StaticMutex sInstanceMutex MOZ_UNANNOTATED;
|
|
};
|
|
|
|
StaticMutex DataChannelRegistry::sInstanceMutex;
|
|
|
|
OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa& info, const uint8_t* data,
|
|
size_t length)
|
|
: mLength(length), mData(data) {
|
|
mInfo = &info;
|
|
mPos = 0;
|
|
}
|
|
|
|
void OutgoingMsg::Advance(size_t offset) {
|
|
mPos += offset;
|
|
if (mPos > mLength) {
|
|
mPos = mLength;
|
|
}
|
|
}
|
|
|
|
BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg& msg) {
|
|
size_t length = msg.GetLeft();
|
|
auto* tmp = new uint8_t[length]; // infallible malloc!
|
|
memcpy(tmp, msg.GetData(), length);
|
|
mLength = length;
|
|
mData = tmp;
|
|
mInfo = new sctp_sendv_spa;
|
|
*mInfo = msg.GetInfo();
|
|
mPos = 0;
|
|
}
|
|
|
|
BufferedOutgoingMsg::~BufferedOutgoingMsg() {
|
|
delete mInfo;
|
|
delete[] mData;
|
|
}
|
|
|
|
static int receive_cb(struct socket* sock, union sctp_sockstore addr,
|
|
void* data, size_t datalen, struct sctp_rcvinfo rcv,
|
|
int flags, void* ulp_info) {
|
|
DC_DEBUG(("In receive_cb, ulp_info=%p", ulp_info));
|
|
uintptr_t id = reinterpret_cast<uintptr_t>(ulp_info);
|
|
RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id);
|
|
if (!connection) {
|
|
// Unfortunately, we can get callbacks after calling
|
|
// usrsctp_close(socket), so we need to simply ignore them if we've
|
|
// already killed the DataChannelConnection object
|
|
DC_DEBUG((
|
|
"Ignoring receive callback for terminated Connection ulp=%p, %zu bytes",
|
|
ulp_info, datalen));
|
|
return 0;
|
|
}
|
|
return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
|
|
}
|
|
|
|
static RefPtr<DataChannelConnection> GetConnectionFromSocket(
|
|
struct socket* sock) {
|
|
struct sockaddr* addrs = nullptr;
|
|
int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
|
|
if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
|
|
return nullptr;
|
|
}
|
|
// usrsctp_getladdrs() returns the addresses bound to this socket, which
|
|
// contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
|
|
// then free the list of addresses once we have the pointer. We only open
|
|
// AF_CONN sockets, and they should all have the sconn_addr set to the
|
|
// pointer that created them, so [0] is as good as any other.
|
|
struct sockaddr_conn* sconn =
|
|
reinterpret_cast<struct sockaddr_conn*>(&addrs[0]);
|
|
uintptr_t id = reinterpret_cast<uintptr_t>(sconn->sconn_addr);
|
|
RefPtr<DataChannelConnection> connection = DataChannelRegistry::Lookup(id);
|
|
usrsctp_freeladdrs(addrs);
|
|
|
|
return connection;
|
|
}
|
|
|
|
// Called when the buffer empties to the threshold value. This is called
|
|
// from SctpDtlsInput() through the sctp stack. SctpDtlsInput() calls
|
|
// usrsctp_conninput() under lock
|
|
static int threshold_event(struct socket* sock, uint32_t sb_free) {
|
|
RefPtr<DataChannelConnection> connection = GetConnectionFromSocket(sock);
|
|
connection->mLock.AssertCurrentThreadOwns();
|
|
if (connection) {
|
|
connection->SendDeferredMessages();
|
|
} else {
|
|
DC_ERROR(("Can't find connection for socket %p", sock));
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
DataChannelConnection::~DataChannelConnection() {
|
|
DC_DEBUG(("Deleting DataChannelConnection %p", (void*)this));
|
|
// This may die on the MainThread, or on the STS thread, or on an
|
|
// sctp thread if we were in a callback when the DOM side shut things down.
|
|
ASSERT_WEBRTC(mState == CLOSED);
|
|
MOZ_ASSERT(!mMasterSocket);
|
|
MOZ_ASSERT(mPending.GetSize() == 0);
|
|
|
|
if (!IsSTSThread()) {
|
|
// We may be on MainThread *or* on an sctp thread (being called from
|
|
// receive_cb() or SctpDtlsOutput())
|
|
if (mInternalIOThread) {
|
|
// Avoid spinning the event thread from here (which if we're mainthread
|
|
// is in the event loop already)
|
|
nsCOMPtr<nsIRunnable> r = WrapRunnable(
|
|
nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::AsyncShutdown);
|
|
Dispatch(r.forget());
|
|
}
|
|
} else {
|
|
// on STS, safe to call shutdown
|
|
if (mInternalIOThread) {
|
|
mInternalIOThread->Shutdown();
|
|
}
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::Destroy() {
|
|
// Though it's probably ok to do this and close the sockets;
|
|
// if we really want it to do true clean shutdowns it can
|
|
// create a dependant Internal object that would remain around
|
|
// until the network shut down the association or timed out.
|
|
DC_DEBUG(("Destroying DataChannelConnection %p", (void*)this));
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
CloseAll();
|
|
|
|
MutexAutoLock lock(mLock);
|
|
// If we had a pending reset, we aren't waiting for it - clear the list so
|
|
// we can deregister this DataChannelConnection without leaking.
|
|
ClearResets();
|
|
|
|
MOZ_ASSERT(mSTS);
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
mListener = nullptr;
|
|
// Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
|
|
// the usrsctp_close() calls can move back here (and just proxy the
|
|
// disconnect_all())
|
|
RUN_ON_THREAD(mSTS,
|
|
WrapRunnable(RefPtr<DataChannelConnection>(this),
|
|
&DataChannelConnection::DestroyOnSTS, mSocket,
|
|
mMasterSocket),
|
|
NS_DISPATCH_NORMAL);
|
|
|
|
// These will be released on STS
|
|
mSocket = nullptr;
|
|
mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
|
|
|
|
// We can't get any more *new* callbacks from the SCTP library
|
|
|
|
// All existing callbacks have refs to DataChannelConnection - however,
|
|
// we need to handle their destroying the object off mainthread/STS
|
|
|
|
// nsDOMDataChannel objects have refs to DataChannels that have refs to us
|
|
}
|
|
|
|
void DataChannelConnection::DestroyOnSTS(struct socket* aMasterSocket,
|
|
struct socket* aSocket) {
|
|
if (aSocket && aSocket != aMasterSocket) usrsctp_close(aSocket);
|
|
if (aMasterSocket) usrsctp_close(aMasterSocket);
|
|
|
|
usrsctp_deregister_address(reinterpret_cast<void*>(mId));
|
|
DC_DEBUG(
|
|
("Deregistered %p from the SCTP stack.", reinterpret_cast<void*>(mId)));
|
|
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
|
|
mShutdown = true;
|
|
DC_DEBUG(("Shutting down connection %p, id %p", this, (void*)mId));
|
|
#endif
|
|
|
|
disconnect_all();
|
|
mTransportHandler = nullptr;
|
|
GetMainThreadEventTarget()->Dispatch(NS_NewRunnableFunction(
|
|
"DataChannelConnection::Destroy",
|
|
[id = mId]() { DataChannelRegistry::Deregister(id); }));
|
|
}
|
|
|
|
Maybe<RefPtr<DataChannelConnection>> DataChannelConnection::Create(
|
|
DataChannelConnection::DataConnectionListener* aListener,
|
|
nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler,
|
|
const uint16_t aLocalPort, const uint16_t aNumStreams,
|
|
const Maybe<uint64_t>& aMaxMessageSize) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
|
|
RefPtr<DataChannelConnection> connection = new DataChannelConnection(
|
|
aListener, aTarget, aHandler); // Walks into a bar
|
|
return connection->Init(aLocalPort, aNumStreams, aMaxMessageSize)
|
|
? Some(connection)
|
|
: Nothing();
|
|
}
|
|
|
|
DataChannelConnection::DataChannelConnection(
|
|
DataChannelConnection::DataConnectionListener* aListener,
|
|
nsISerialEventTarget* aTarget, MediaTransportHandler* aHandler)
|
|
: NeckoTargetHolder(aTarget),
|
|
mLock("netwerk::sctp::DataChannelConnection"),
|
|
mListener(aListener),
|
|
mTransportHandler(aHandler) {
|
|
DC_VERBOSE(("Constructor DataChannelConnection=%p, listener=%p", this,
|
|
mListener.get()));
|
|
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
|
|
mShutdown = false;
|
|
#endif
|
|
}
|
|
|
|
bool DataChannelConnection::Init(const uint16_t aLocalPort,
|
|
const uint16_t aNumStreams,
|
|
const Maybe<uint64_t>& aMaxMessageSize) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
|
|
struct sctp_initmsg initmsg;
|
|
struct sctp_assoc_value av;
|
|
struct sctp_event event;
|
|
socklen_t len;
|
|
|
|
uint16_t event_types[] = {
|
|
SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE,
|
|
SCTP_REMOTE_ERROR, SCTP_SHUTDOWN_EVENT,
|
|
SCTP_ADAPTATION_INDICATION, SCTP_PARTIAL_DELIVERY_EVENT,
|
|
SCTP_SEND_FAILED_EVENT, SCTP_STREAM_RESET_EVENT,
|
|
SCTP_STREAM_CHANGE_EVENT};
|
|
{
|
|
// MutexAutoLock lock(mLock); Not needed since we're on mainthread always
|
|
mLocalPort = aLocalPort;
|
|
SetMaxMessageSize(aMaxMessageSize.isSome(), aMaxMessageSize.valueOr(0));
|
|
}
|
|
|
|
mId = DataChannelRegistry::Register(this);
|
|
|
|
// XXX FIX! make this a global we get once
|
|
// Find the STS thread
|
|
nsresult rv;
|
|
mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
|
|
MOZ_ASSERT(NS_SUCCEEDED(rv));
|
|
|
|
// Open sctp with a callback
|
|
if ((mMasterSocket = usrsctp_socket(
|
|
AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
|
|
usrsctp_sysctl_get_sctp_sendspace() / 2,
|
|
reinterpret_cast<void*>(mId))) == nullptr) {
|
|
return false;
|
|
}
|
|
|
|
int buf_size = 1024 * 1024;
|
|
|
|
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_RCVBUF,
|
|
(const void*)&buf_size, sizeof(buf_size)) < 0) {
|
|
DC_ERROR(("Couldn't change receive buffer size on SCTP socket"));
|
|
goto error_cleanup;
|
|
}
|
|
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_SNDBUF,
|
|
(const void*)&buf_size, sizeof(buf_size)) < 0) {
|
|
DC_ERROR(("Couldn't change send buffer size on SCTP socket"));
|
|
goto error_cleanup;
|
|
}
|
|
|
|
// Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
|
|
// in associations for normal IO
|
|
if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
|
|
DC_ERROR(("Couldn't set non_blocking on SCTP socket"));
|
|
// We can't handle connect() safely if it will block, not that this will
|
|
// even happen.
|
|
goto error_cleanup;
|
|
}
|
|
|
|
// Make sure when we close the socket, make sure it doesn't call us back
|
|
// again! This would cause it try to use an invalid DataChannelConnection
|
|
// pointer
|
|
struct linger l;
|
|
l.l_onoff = 1;
|
|
l.l_linger = 0;
|
|
if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, (const void*)&l,
|
|
(socklen_t)sizeof(struct linger)) < 0) {
|
|
DC_ERROR(("Couldn't set SO_LINGER on SCTP socket"));
|
|
// unsafe to allow it to continue if this fails
|
|
goto error_cleanup;
|
|
}
|
|
|
|
// XXX Consider disabling this when we add proper SDP negotiation.
|
|
// We may want to leave enabled for supporting 'cloning' of SDP offers, which
|
|
// implies re-use of the same pseudo-port number, or forcing a renegotiation.
|
|
{
|
|
const int option_value = 1;
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
|
|
(const void*)&option_value,
|
|
(socklen_t)sizeof(option_value)) < 0) {
|
|
DC_WARN(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
|
|
}
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
|
|
(const void*)&option_value,
|
|
(socklen_t)sizeof(option_value)) < 0) {
|
|
DC_WARN(("Couldn't set SCTP_NODELAY on SCTP socket"));
|
|
}
|
|
}
|
|
|
|
// Set explicit EOR
|
|
{
|
|
const int option_value = 1;
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
|
|
(const void*)&option_value,
|
|
(socklen_t)sizeof(option_value)) < 0) {
|
|
DC_ERROR(("*** failed to enable explicit EOR mode %d", errno));
|
|
goto error_cleanup;
|
|
}
|
|
}
|
|
|
|
// Enable ndata
|
|
// TODO: Bug 1381145, enable this once ndata has been deployed
|
|
#if 0
|
|
av.assoc_id = SCTP_FUTURE_ASSOC;
|
|
av.assoc_value = 1;
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
|
|
(socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
|
|
DC_ERROR(("*** failed enable ndata errno %d", errno));
|
|
goto error_cleanup;
|
|
}
|
|
#endif
|
|
|
|
av.assoc_id = SCTP_ALL_ASSOC;
|
|
av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
|
|
&av, (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
|
|
DC_ERROR(("*** failed enable stream reset errno %d", errno));
|
|
goto error_cleanup;
|
|
}
|
|
|
|
/* Enable the events of interest. */
|
|
memset(&event, 0, sizeof(event));
|
|
event.se_assoc_id = SCTP_ALL_ASSOC;
|
|
event.se_on = 1;
|
|
for (unsigned short event_type : event_types) {
|
|
event.se_type = event_type;
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event,
|
|
sizeof(event)) < 0) {
|
|
DC_ERROR(("*** failed setsockopt SCTP_EVENT errno %d", errno));
|
|
goto error_cleanup;
|
|
}
|
|
}
|
|
|
|
memset(&initmsg, 0, sizeof(initmsg));
|
|
len = sizeof(initmsg);
|
|
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
|
|
&len) < 0) {
|
|
DC_ERROR(("*** failed getsockopt SCTP_INITMSG"));
|
|
goto error_cleanup;
|
|
}
|
|
DC_DEBUG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
|
|
initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
|
|
initmsg.sinit_num_ostreams = aNumStreams;
|
|
initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
|
|
(socklen_t)sizeof(initmsg)) < 0) {
|
|
DC_ERROR(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
|
|
goto error_cleanup;
|
|
}
|
|
|
|
mSocket = nullptr;
|
|
mSTS->Dispatch(
|
|
NS_NewRunnableFunction("DataChannelConnection::Init", [id = mId]() {
|
|
usrsctp_register_address(reinterpret_cast<void*>(id));
|
|
DC_DEBUG(("Registered %p within the SCTP stack.",
|
|
reinterpret_cast<void*>(id)));
|
|
}));
|
|
|
|
return true;
|
|
|
|
error_cleanup:
|
|
usrsctp_close(mMasterSocket);
|
|
mMasterSocket = nullptr;
|
|
return false;
|
|
}
|
|
|
|
// Only called on MainThread, mMaxMessageSize is read on other threads
|
|
void DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet,
|
|
uint64_t aMaxMessageSize) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
MutexAutoLock lock(mLock);
|
|
|
|
if (mMaxMessageSizeSet && !aMaxMessageSizeSet) {
|
|
// Don't overwrite already set MMS with default values
|
|
return;
|
|
}
|
|
|
|
mMaxMessageSizeSet = aMaxMessageSizeSet;
|
|
mMaxMessageSize = aMaxMessageSize;
|
|
|
|
nsresult rv;
|
|
nsCOMPtr<nsIPrefService> prefs =
|
|
do_GetService("@mozilla.org/preferences-service;1", &rv);
|
|
if (!NS_WARN_IF(NS_FAILED(rv))) {
|
|
nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
|
|
|
|
if (branch) {
|
|
int32_t temp;
|
|
if (!NS_FAILED(branch->GetIntPref(
|
|
"media.peerconnection.sctp.force_maximum_message_size", &temp))) {
|
|
if (temp >= 0) {
|
|
mMaxMessageSize = (uint64_t)temp;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Fix remote MMS. This code exists, so future implementations of
|
|
// RTCSctpTransport.maxMessageSize can simply provide that value from
|
|
// GetMaxMessageSize.
|
|
|
|
// TODO: Bug 1382779, once resolved, can be increased to
|
|
// min(Uint8ArrayMaxSize, UINT32_MAX)
|
|
// TODO: Bug 1381146, once resolved, can be increased to whatever we support
|
|
// then (hopefully
|
|
// SIZE_MAX)
|
|
if (mMaxMessageSize == 0 ||
|
|
mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
|
|
mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
|
|
}
|
|
|
|
DC_DEBUG(("Maximum message size (outgoing data): %" PRIu64
|
|
" (set=%s, enforced=%s)",
|
|
mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
|
|
aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
|
|
}
|
|
|
|
uint64_t DataChannelConnection::GetMaxMessageSize() {
|
|
MutexAutoLock lock(mLock);
|
|
return mMaxMessageSize;
|
|
}
|
|
|
|
void DataChannelConnection::AppendStatsToReport(
|
|
const UniquePtr<dom::RTCStatsCollection>& aReport,
|
|
const DOMHighResTimeStamp aTimestamp) const {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
nsString temp;
|
|
for (const RefPtr<DataChannel>& chan : mChannels.GetAll()) {
|
|
// If channel is empty, ignore
|
|
if (!chan) {
|
|
continue;
|
|
}
|
|
mozilla::dom::RTCDataChannelStats stats;
|
|
nsString id = u"dc"_ns;
|
|
id.AppendInt(chan->GetStream());
|
|
stats.mId.Construct(id);
|
|
chan->GetLabel(temp);
|
|
stats.mTimestamp.Construct(aTimestamp);
|
|
stats.mType.Construct(mozilla::dom::RTCStatsType::Data_channel);
|
|
stats.mLabel.Construct(temp);
|
|
chan->GetProtocol(temp);
|
|
stats.mProtocol.Construct(temp);
|
|
stats.mDataChannelIdentifier.Construct(chan->GetStream());
|
|
{
|
|
using State = mozilla::dom::RTCDataChannelState;
|
|
State state;
|
|
switch (chan->GetReadyState()) {
|
|
case CONNECTING:
|
|
state = State::Connecting;
|
|
break;
|
|
case OPEN:
|
|
state = State::Open;
|
|
break;
|
|
case CLOSING:
|
|
state = State::Closing;
|
|
break;
|
|
case CLOSED:
|
|
state = State::Closed;
|
|
break;
|
|
default:
|
|
MOZ_ASSERT(false, "Unknown DataChannel state");
|
|
continue;
|
|
};
|
|
stats.mState.Construct(state);
|
|
}
|
|
auto counters = chan->GetTrafficCounters();
|
|
stats.mMessagesSent.Construct(counters.mMessagesSent);
|
|
stats.mBytesSent.Construct(counters.mBytesSent);
|
|
stats.mMessagesReceived.Construct(counters.mMessagesReceived);
|
|
stats.mBytesReceived.Construct(counters.mBytesReceived);
|
|
if (!aReport->mDataChannelStats.AppendElement(stats, fallible)) {
|
|
mozalloc_handle_oom(0);
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifdef MOZ_PEERCONNECTION
|
|
|
|
bool DataChannelConnection::ConnectToTransport(const std::string& aTransportId,
|
|
const bool aClient,
|
|
const uint16_t aLocalPort,
|
|
const uint16_t aRemotePort) {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
MOZ_ASSERT(mMasterSocket,
|
|
"SCTP wasn't initialized before ConnectToTransport!");
|
|
static const auto paramString =
|
|
[](const std::string& tId, const Maybe<bool>& client,
|
|
const uint16_t localPort, const uint16_t remotePort) -> std::string {
|
|
std::ostringstream stream;
|
|
stream << "Transport ID: '" << tId << "', Role: '"
|
|
<< (client ? (client.value() ? "client" : "server") : "")
|
|
<< "', Local Port: '" << localPort << "', Remote Port: '"
|
|
<< remotePort << "'";
|
|
return stream.str();
|
|
};
|
|
|
|
const auto params =
|
|
paramString(aTransportId, Some(aClient), aLocalPort, aRemotePort);
|
|
DC_DEBUG(("ConnectToTransport connecting DTLS transport with parameters: %s",
|
|
params.c_str()));
|
|
|
|
const auto currentReadyState = GetReadyState();
|
|
if (currentReadyState == OPEN) {
|
|
if (aTransportId == mTransportId && mAllocateEven.isSome() &&
|
|
mAllocateEven.value() == aClient && mLocalPort == aLocalPort &&
|
|
mRemotePort == aRemotePort) {
|
|
DC_WARN(
|
|
("Skipping attempt to connect to an already OPEN transport with "
|
|
"identical parameters."));
|
|
return true;
|
|
}
|
|
DC_WARN(
|
|
("Attempting to connect to an already OPEN transport, because "
|
|
"different parameters were provided."));
|
|
DC_WARN(("Original transport parameters: %s",
|
|
paramString(mTransportId, mAllocateEven, mLocalPort, aRemotePort)
|
|
.c_str()));
|
|
DC_WARN(("New transport parameters: %s", params.c_str()));
|
|
}
|
|
if (NS_WARN_IF(aTransportId.empty())) {
|
|
return false;
|
|
}
|
|
|
|
mLocalPort = aLocalPort;
|
|
mRemotePort = aRemotePort;
|
|
SetReadyState(CONNECTING);
|
|
mAllocateEven = Some(aClient);
|
|
|
|
// Could be faster. Probably doesn't matter.
|
|
while (auto channel = mChannels.Get(INVALID_STREAM)) {
|
|
mChannels.Remove(channel);
|
|
channel->mStream = FindFreeStream();
|
|
if (channel->mStream != INVALID_STREAM) {
|
|
mChannels.Insert(channel);
|
|
}
|
|
}
|
|
RUN_ON_THREAD(mSTS,
|
|
WrapRunnable(RefPtr<DataChannelConnection>(this),
|
|
&DataChannelConnection::SetSignals, aTransportId),
|
|
NS_DISPATCH_NORMAL);
|
|
return true;
|
|
}
|
|
|
|
void DataChannelConnection::SetSignals(const std::string& aTransportId) {
|
|
ASSERT_WEBRTC(IsSTSThread());
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
mTransportId = aTransportId;
|
|
}
|
|
mTransportHandler->SignalPacketReceived.connect(
|
|
this, &DataChannelConnection::SctpDtlsInput);
|
|
mTransportHandler->SignalStateChange.connect(
|
|
this, &DataChannelConnection::TransportStateChange);
|
|
// SignalStateChange() doesn't call you with the initial state
|
|
if (mTransportHandler->GetState(mTransportId, false) ==
|
|
TransportLayer::TS_OPEN) {
|
|
DC_DEBUG(("Setting transport signals, dtls already open"));
|
|
CompleteConnect();
|
|
} else {
|
|
DC_DEBUG(("Setting transport signals, dtls not open yet"));
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::TransportStateChange(
|
|
const std::string& aTransportId, TransportLayer::State aState) {
|
|
ASSERT_WEBRTC(IsSTSThread());
|
|
if (aTransportId == mTransportId) {
|
|
if (aState == TransportLayer::TS_OPEN) {
|
|
DC_DEBUG(("Transport is open!"));
|
|
CompleteConnect();
|
|
} else if (aState == TransportLayer::TS_CLOSED ||
|
|
aState == TransportLayer::TS_NONE ||
|
|
aState == TransportLayer::TS_ERROR) {
|
|
DC_DEBUG(("Transport is closed!"));
|
|
Stop();
|
|
}
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::CompleteConnect() {
|
|
MutexAutoLock lock(mLock);
|
|
|
|
DC_DEBUG(("dtls open"));
|
|
ASSERT_WEBRTC(IsSTSThread());
|
|
if (!mMasterSocket) {
|
|
return;
|
|
}
|
|
|
|
struct sockaddr_conn addr;
|
|
memset(&addr, 0, sizeof(addr));
|
|
addr.sconn_family = AF_CONN;
|
|
# if defined(__Userspace_os_Darwin)
|
|
addr.sconn_len = sizeof(addr);
|
|
# endif
|
|
addr.sconn_port = htons(mLocalPort);
|
|
addr.sconn_addr = reinterpret_cast<void*>(mId);
|
|
|
|
DC_DEBUG(("Calling usrsctp_bind"));
|
|
int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr),
|
|
sizeof(addr));
|
|
if (r < 0) {
|
|
DC_ERROR(("usrsctp_bind failed: %d", r));
|
|
} else {
|
|
// This is the remote addr
|
|
addr.sconn_port = htons(mRemotePort);
|
|
DC_DEBUG(("Calling usrsctp_connect"));
|
|
r = usrsctp_connect(
|
|
mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
|
|
if (r >= 0 || errno == EINPROGRESS) {
|
|
struct sctp_paddrparams paddrparams;
|
|
socklen_t opt_len;
|
|
|
|
memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
|
|
memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
|
|
opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
|
|
r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
|
|
&paddrparams, &opt_len);
|
|
if (r < 0) {
|
|
DC_ERROR(("usrsctp_getsockopt failed: %d", r));
|
|
} else {
|
|
// draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4
|
|
// 1200, IPV6 1280
|
|
paddrparams.spp_pathmtu = 1200; // safe for either
|
|
paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
|
|
paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
|
|
opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
|
|
r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP,
|
|
SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len);
|
|
if (r < 0) {
|
|
DC_ERROR(("usrsctp_getsockopt failed: %d", r));
|
|
} else {
|
|
DC_ERROR(("usrsctp: PMTUD disabled, MTU set to %u",
|
|
paddrparams.spp_pathmtu));
|
|
}
|
|
}
|
|
}
|
|
if (r < 0) {
|
|
if (errno == EINPROGRESS) {
|
|
// non-blocking
|
|
return;
|
|
}
|
|
DC_ERROR(("usrsctp_connect failed: %d", errno));
|
|
SetReadyState(CLOSED);
|
|
} else {
|
|
// We fire ON_CONNECTION via SCTP_COMM_UP when we get that
|
|
return;
|
|
}
|
|
}
|
|
// Note: currently this doesn't actually notify the application
|
|
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
|
|
DataChannelOnMessageAvailable::ON_CONNECTION, this)));
|
|
}
|
|
|
|
// Process any pending Opens
|
|
void DataChannelConnection::ProcessQueuedOpens() {
|
|
// The nsDeque holds channels with an AddRef applied. Another reference
|
|
// (may) be held by the DOMDataChannel, unless it's been GC'd. No other
|
|
// references should exist.
|
|
|
|
// Can't copy nsDeque's. Move into temp array since any that fail will
|
|
// go back to mPending
|
|
nsRefPtrDeque<DataChannel> temp;
|
|
RefPtr<DataChannel> temp_channel;
|
|
while (nullptr != (temp_channel = mPending.PopFront())) {
|
|
temp.Push(temp_channel.forget());
|
|
}
|
|
|
|
RefPtr<DataChannel> channel;
|
|
|
|
while (nullptr != (channel = temp.PopFront())) {
|
|
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
|
|
DC_DEBUG(("Processing queued open for %p (%u)", channel.get(),
|
|
channel->mStream));
|
|
channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
|
|
// OpenFinish returns a reference itself, so we need to take it can
|
|
// Release it
|
|
channel = OpenFinish(channel.forget()); // may reset the flag and re-push
|
|
} else {
|
|
NS_ASSERTION(
|
|
false,
|
|
"How did a DataChannel get queued without the FINISH_OPEN flag?");
|
|
}
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::SctpDtlsInput(const std::string& aTransportId,
|
|
const MediaPacket& packet) {
|
|
MutexAutoLock lock(mLock);
|
|
if ((packet.type() != MediaPacket::SCTP) || (mTransportId != aTransportId)) {
|
|
return;
|
|
}
|
|
|
|
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
|
|
char* buf;
|
|
|
|
if ((buf = usrsctp_dumppacket((void*)packet.data(), packet.len(),
|
|
SCTP_DUMP_INBOUND)) != nullptr) {
|
|
SCTP_LOG(("%s", buf));
|
|
usrsctp_freedumpbuffer(buf);
|
|
}
|
|
}
|
|
// Pass the data to SCTP
|
|
usrsctp_conninput(reinterpret_cast<void*>(mId), packet.data(), packet.len(),
|
|
0);
|
|
}
|
|
|
|
void DataChannelConnection::SendPacket(std::unique_ptr<MediaPacket>&& packet) {
|
|
mSTS->Dispatch(NS_NewRunnableFunction(
|
|
"DataChannelConnection::SendPacket",
|
|
[this, self = RefPtr<DataChannelConnection>(this),
|
|
packet = std::move(packet)]() mutable {
|
|
// DC_DEBUG(("%p: SCTP/DTLS sent %ld bytes", this, len));
|
|
if (!mTransportId.empty() && mTransportHandler) {
|
|
mTransportHandler->SendPacket(mTransportId, std::move(*packet));
|
|
}
|
|
}));
|
|
}
|
|
|
|
int DataChannelConnection::SctpDtlsOutput(void* addr, void* buffer,
|
|
size_t length, uint8_t tos,
|
|
uint8_t set_df) {
|
|
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
|
|
char* buf;
|
|
|
|
if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) !=
|
|
nullptr) {
|
|
SCTP_LOG(("%s", buf));
|
|
usrsctp_freedumpbuffer(buf);
|
|
}
|
|
}
|
|
|
|
// We're async proxying even if on the STSThread because this is called
|
|
// with internal SCTP locks held in some cases (such as in usrsctp_connect()).
|
|
// SCTP has an option for Apple, on IP connections only, to release at least
|
|
// one of the locks before calling a packet output routine; with changes to
|
|
// the underlying SCTP stack this might remove the need to use an async proxy.
|
|
std::unique_ptr<MediaPacket> packet(new MediaPacket);
|
|
packet->SetType(MediaPacket::SCTP);
|
|
packet->Copy(static_cast<const uint8_t*>(buffer), length);
|
|
|
|
if (NS_IsMainThread() && mDeferSend) {
|
|
mDeferredSend.emplace_back(std::move(packet));
|
|
return 0;
|
|
}
|
|
|
|
SendPacket(std::move(packet));
|
|
return 0; // cheat! Packets can always be dropped later anyways
|
|
}
|
|
#endif
|
|
|
|
#ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
|
|
// listen for incoming associations
|
|
// Blocks! - Don't call this from main thread!
|
|
|
|
bool DataChannelConnection::Listen(unsigned short port) {
|
|
struct sockaddr_in addr;
|
|
socklen_t addr_len;
|
|
|
|
NS_WARNING_ASSERTION(!NS_IsMainThread(),
|
|
"Blocks, do not call from main thread!!!");
|
|
|
|
/* Acting as the 'server' */
|
|
memset((void*)&addr, 0, sizeof(addr));
|
|
# ifdef HAVE_SIN_LEN
|
|
addr.sin_len = sizeof(struct sockaddr_in);
|
|
# endif
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(port);
|
|
addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
DC_DEBUG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
SetReadyState(CONNECTING);
|
|
}
|
|
if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr*>(&addr),
|
|
sizeof(struct sockaddr_in)) < 0) {
|
|
DC_ERROR(("***Failed userspace_bind"));
|
|
return false;
|
|
}
|
|
if (usrsctp_listen(mMasterSocket, 1) < 0) {
|
|
DC_ERROR(("***Failed userspace_listen"));
|
|
return false;
|
|
}
|
|
|
|
DC_DEBUG(("Accepting connection"));
|
|
addr_len = 0;
|
|
if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) ==
|
|
nullptr) {
|
|
DC_ERROR(("***Failed accept"));
|
|
return false;
|
|
}
|
|
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
SetReadyState(OPEN);
|
|
}
|
|
|
|
struct linger l;
|
|
l.l_onoff = 1;
|
|
l.l_linger = 0;
|
|
if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, (const void*)&l,
|
|
(socklen_t)sizeof(struct linger)) < 0) {
|
|
DC_WARN(("Couldn't set SO_LINGER on SCTP socket"));
|
|
}
|
|
|
|
// Notify Connection open
|
|
// XXX We need to make sure connection sticks around until the message is
|
|
// delivered
|
|
DC_DEBUG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
|
|
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
|
|
DataChannelOnMessageAvailable::ON_CONNECTION, this,
|
|
(DataChannel*)nullptr)));
|
|
return true;
|
|
}
|
|
|
|
// Blocks! - Don't call this from main thread!
|
|
bool DataChannelConnection::Connect(const char* addr, unsigned short port) {
|
|
struct sockaddr_in addr4;
|
|
struct sockaddr_in6 addr6;
|
|
|
|
NS_WARNING_ASSERTION(!NS_IsMainThread(),
|
|
"Blocks, do not call from main thread!!!");
|
|
|
|
/* Acting as the connector */
|
|
DC_DEBUG(("Connecting to %s, port %u", addr, port));
|
|
memset((void*)&addr4, 0, sizeof(struct sockaddr_in));
|
|
memset((void*)&addr6, 0, sizeof(struct sockaddr_in6));
|
|
# ifdef HAVE_SIN_LEN
|
|
addr4.sin_len = sizeof(struct sockaddr_in);
|
|
# endif
|
|
# ifdef HAVE_SIN6_LEN
|
|
addr6.sin6_len = sizeof(struct sockaddr_in6);
|
|
# endif
|
|
addr4.sin_family = AF_INET;
|
|
addr6.sin6_family = AF_INET6;
|
|
addr4.sin_port = htons(port);
|
|
addr6.sin6_port = htons(port);
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
SetReadyState(CONNECTING);
|
|
}
|
|
# if !defined(__Userspace_os_Windows)
|
|
if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
|
|
if (usrsctp_connect(mMasterSocket,
|
|
reinterpret_cast<struct sockaddr*>(&addr6),
|
|
sizeof(struct sockaddr_in6)) < 0) {
|
|
DC_ERROR(("*** Failed userspace_connect"));
|
|
return false;
|
|
}
|
|
} else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
|
|
if (usrsctp_connect(mMasterSocket,
|
|
reinterpret_cast<struct sockaddr*>(&addr4),
|
|
sizeof(struct sockaddr_in)) < 0) {
|
|
DC_ERROR(("*** Failed userspace_connect"));
|
|
return false;
|
|
}
|
|
} else {
|
|
DC_ERROR(("*** Illegal destination address."));
|
|
}
|
|
# else
|
|
{
|
|
struct sockaddr_storage ss;
|
|
int sslen = sizeof(ss);
|
|
|
|
if (!WSAStringToAddressA(const_cast<char*>(addr), AF_INET6, nullptr,
|
|
(struct sockaddr*)&ss, &sslen)) {
|
|
addr6.sin6_addr =
|
|
(reinterpret_cast<struct sockaddr_in6*>(&ss))->sin6_addr;
|
|
if (usrsctp_connect(mMasterSocket,
|
|
reinterpret_cast<struct sockaddr*>(&addr6),
|
|
sizeof(struct sockaddr_in6)) < 0) {
|
|
DC_ERROR(("*** Failed userspace_connect"));
|
|
return false;
|
|
}
|
|
} else if (!WSAStringToAddressA(const_cast<char*>(addr), AF_INET, nullptr,
|
|
(struct sockaddr*)&ss, &sslen)) {
|
|
addr4.sin_addr = (reinterpret_cast<struct sockaddr_in*>(&ss))->sin_addr;
|
|
if (usrsctp_connect(mMasterSocket,
|
|
reinterpret_cast<struct sockaddr*>(&addr4),
|
|
sizeof(struct sockaddr_in)) < 0) {
|
|
DC_ERROR(("*** Failed userspace_connect"));
|
|
return false;
|
|
}
|
|
} else {
|
|
DC_ERROR(("*** Illegal destination address."));
|
|
}
|
|
}
|
|
# endif
|
|
|
|
mSocket = mMasterSocket;
|
|
|
|
DC_DEBUG(("connect() succeeded! Entering connected mode"));
|
|
{
|
|
MutexAutoLock lock(mLock);
|
|
SetReadyState(OPEN);
|
|
}
|
|
// Notify Connection open
|
|
// XXX We need to make sure connection sticks around until the message is
|
|
// delivered
|
|
DC_DEBUG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
|
|
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
|
|
DataChannelOnMessageAvailable::ON_CONNECTION, this,
|
|
(DataChannel*)nullptr)));
|
|
return true;
|
|
}
|
|
#endif
|
|
|
|
DataChannel* DataChannelConnection::FindChannelByStream(uint16_t stream) {
|
|
return mChannels.Get(stream).get();
|
|
}
|
|
|
|
uint16_t DataChannelConnection::FindFreeStream() {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
uint16_t i, limit;
|
|
|
|
limit = MAX_NUM_STREAMS;
|
|
|
|
MOZ_ASSERT(mAllocateEven.isSome());
|
|
for (i = (*mAllocateEven ? 0 : 1); i < limit; i += 2) {
|
|
if (mChannels.Get(i)) {
|
|
continue;
|
|
}
|
|
|
|
// Verify it's not still in the process of closing
|
|
size_t j;
|
|
for (j = 0; j < mStreamsResetting.Length(); ++j) {
|
|
if (mStreamsResetting[j] == i) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (j == mStreamsResetting.Length()) {
|
|
return i;
|
|
}
|
|
}
|
|
return INVALID_STREAM;
|
|
}
|
|
|
|
uint32_t DataChannelConnection::UpdateCurrentStreamIndex() {
|
|
RefPtr<DataChannel> channel = mChannels.GetNextChannel(mCurrentStream);
|
|
if (!channel) {
|
|
mCurrentStream = 0;
|
|
} else {
|
|
mCurrentStream = channel->mStream;
|
|
}
|
|
return mCurrentStream;
|
|
}
|
|
|
|
uint32_t DataChannelConnection::GetCurrentStreamIndex() {
|
|
if (!mChannels.Get(mCurrentStream)) {
|
|
// The stream muse have been removed, reset
|
|
DC_DEBUG(("Reset mCurrentChannel"));
|
|
mCurrentStream = 0;
|
|
}
|
|
return mCurrentStream;
|
|
}
|
|
|
|
bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
|
|
struct sctp_status status;
|
|
struct sctp_add_streams sas;
|
|
uint32_t outStreamsNeeded;
|
|
socklen_t len;
|
|
|
|
if (aNeeded + mNegotiatedIdLimit > MAX_NUM_STREAMS) {
|
|
aNeeded = MAX_NUM_STREAMS - mNegotiatedIdLimit;
|
|
}
|
|
if (aNeeded <= 0) {
|
|
return false;
|
|
}
|
|
|
|
len = (socklen_t)sizeof(struct sctp_status);
|
|
if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status,
|
|
&len) < 0) {
|
|
DC_ERROR(("***failed: getsockopt SCTP_STATUS"));
|
|
return false;
|
|
}
|
|
outStreamsNeeded = aNeeded; // number to add
|
|
|
|
// Note: if multiple channel opens happen when we don't have enough space,
|
|
// we'll call RequestMoreStreams() multiple times
|
|
memset(&sas, 0, sizeof(sas));
|
|
sas.sas_instrms = 0;
|
|
sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
|
|
// Doesn't block, we get an event when it succeeds or fails
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
|
|
(socklen_t)sizeof(struct sctp_add_streams)) < 0) {
|
|
if (errno == EALREADY) {
|
|
DC_DEBUG(("Already have %u output streams", outStreamsNeeded));
|
|
return true;
|
|
}
|
|
|
|
DC_ERROR(("***failed: setsockopt ADD errno=%d", errno));
|
|
return false;
|
|
}
|
|
DC_DEBUG(("Requested %u more streams", outStreamsNeeded));
|
|
// We add to mNegotiatedIdLimit when we get a SCTP_STREAM_CHANGE_EVENT and the
|
|
// values are larger than mNegotiatedIdLimit
|
|
return true;
|
|
}
|
|
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendControlMessage(const uint8_t* data, uint32_t len,
|
|
uint16_t stream) {
|
|
struct sctp_sendv_spa info = {0};
|
|
|
|
// General flags
|
|
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
|
|
|
|
// Set stream identifier, protocol identifier and flags
|
|
info.sendv_sndinfo.snd_sid = stream;
|
|
info.sendv_sndinfo.snd_flags = SCTP_EOR;
|
|
info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
|
|
|
|
// Create message instance and send
|
|
// Note: Main-thread IO, but doesn't block
|
|
#if (UINT32_MAX > SIZE_MAX)
|
|
if (len > SIZE_MAX) {
|
|
return EMSGSIZE;
|
|
}
|
|
#endif
|
|
OutgoingMsg msg(info, data, (size_t)len);
|
|
bool buffered;
|
|
int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered, nullptr);
|
|
|
|
// Set pending type (if buffered)
|
|
if (!error && buffered && !mPendingType) {
|
|
mPendingType = PENDING_DCEP;
|
|
}
|
|
return error;
|
|
}
|
|
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendOpenAckMessage(uint16_t stream) {
|
|
struct rtcweb_datachannel_ack ack;
|
|
|
|
memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
|
|
ack.msg_type = DATA_CHANNEL_ACK;
|
|
|
|
return SendControlMessage((const uint8_t*)&ack, sizeof(ack), stream);
|
|
}
|
|
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendOpenRequestMessage(
|
|
const nsACString& label, const nsACString& protocol, uint16_t stream,
|
|
bool unordered, uint16_t prPolicy, uint32_t prValue) {
|
|
const int label_len = label.Length(); // not including nul
|
|
const int proto_len = protocol.Length(); // not including nul
|
|
// careful - request struct include one char for the label
|
|
const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
|
|
label_len + proto_len;
|
|
UniqueFreePtr<struct rtcweb_datachannel_open_request> req(
|
|
(struct rtcweb_datachannel_open_request*)moz_xmalloc(req_size));
|
|
|
|
memset(req.get(), 0, req_size);
|
|
req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
|
|
switch (prPolicy) {
|
|
case SCTP_PR_SCTP_NONE:
|
|
req->channel_type = DATA_CHANNEL_RELIABLE;
|
|
break;
|
|
case SCTP_PR_SCTP_TTL:
|
|
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
|
|
break;
|
|
case SCTP_PR_SCTP_RTX:
|
|
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
|
|
break;
|
|
default:
|
|
return EINVAL;
|
|
}
|
|
if (unordered) {
|
|
// Per the current types, all differ by 0x80 between ordered and unordered
|
|
req->channel_type |=
|
|
0x80; // NOTE: be careful if new types are added in the future
|
|
}
|
|
|
|
req->reliability_param = htonl(prValue);
|
|
req->priority = htons(0); /* XXX: add support */
|
|
req->label_length = htons(label_len);
|
|
req->protocol_length = htons(proto_len);
|
|
memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
|
|
memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
|
|
|
|
// TODO: req_size is an int... that looks hairy
|
|
int error = SendControlMessage((const uint8_t*)req.get(), req_size, stream);
|
|
return error;
|
|
}
|
|
|
|
// XXX This should use a separate thread (outbound queue) which should
|
|
// select() to know when to *try* to send data to the socket again.
|
|
// Alternatively, it can use a timeout, but that's guaranteed to be wrong
|
|
// (just not sure in what direction). We could re-implement NSPR's
|
|
// PR_POLL_WRITE/etc handling... with a lot of work.
|
|
|
|
// Better yet, use the SCTP stack's notifications on buffer state to avoid
|
|
// filling the SCTP's buffers.
|
|
|
|
// returns if we're still blocked (true)
|
|
bool DataChannelConnection::SendDeferredMessages() {
|
|
RefPtr<DataChannel> channel; // we may null out the refs to this
|
|
|
|
// This may block while something is modifying channels, but should not block
|
|
// for IO
|
|
ASSERT_WEBRTC(!NS_IsMainThread());
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
DC_DEBUG(("SendDeferredMessages called, pending type: %d", mPendingType));
|
|
if (!mPendingType) {
|
|
return false;
|
|
}
|
|
|
|
// Send pending control messages
|
|
// Note: If ndata is not active, check if DCEP messages are currently
|
|
// outstanding. These need to
|
|
// be sent first before other streams can be used for sending.
|
|
if (!mBufferedControl.IsEmpty() &&
|
|
(mSendInterleaved || mPendingType == PENDING_DCEP)) {
|
|
if (SendBufferedMessages(mBufferedControl, nullptr)) {
|
|
return true;
|
|
}
|
|
|
|
// Note: There may or may not be pending data messages
|
|
mPendingType = PENDING_DATA;
|
|
}
|
|
|
|
bool blocked = false;
|
|
uint32_t i = GetCurrentStreamIndex();
|
|
uint32_t end = i;
|
|
do {
|
|
channel = mChannels.Get(i);
|
|
// Should already be cleared if closing/closed
|
|
if (!channel || channel->mBufferedData.IsEmpty()) {
|
|
i = UpdateCurrentStreamIndex();
|
|
continue;
|
|
}
|
|
|
|
// Send buffered data messages
|
|
// Warning: This will fail in case ndata is inactive and a previously
|
|
// deallocated data channel has not been closed properly. If you
|
|
// ever see that no messages can be sent on any channel, this is
|
|
// likely the cause (an explicit EOR message partially sent whose
|
|
// remaining chunks are still being waited for).
|
|
size_t written = 0;
|
|
mDeferSend = true;
|
|
blocked = SendBufferedMessages(channel->mBufferedData, &written);
|
|
mDeferSend = false;
|
|
if (written) {
|
|
channel->DecrementBufferedAmount(written);
|
|
}
|
|
|
|
for (auto&& packet : mDeferredSend) {
|
|
MOZ_ASSERT(written);
|
|
SendPacket(std::move(packet));
|
|
}
|
|
mDeferredSend.clear();
|
|
|
|
// Update current stream index
|
|
// Note: If ndata is not active, the outstanding data messages on this
|
|
// stream need to be sent first before other streams can be used for
|
|
// sending.
|
|
if (mSendInterleaved || !blocked) {
|
|
i = UpdateCurrentStreamIndex();
|
|
}
|
|
} while (!blocked && i != end);
|
|
|
|
if (!blocked) {
|
|
mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
|
|
}
|
|
return blocked;
|
|
}
|
|
|
|
// Called with mLock locked!
|
|
// buffer MUST have at least one item!
|
|
// returns if we're still blocked (true)
|
|
bool DataChannelConnection::SendBufferedMessages(
|
|
nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer, size_t* aWritten) {
|
|
do {
|
|
// Re-send message
|
|
int error = SendMsgInternal(*buffer[0], aWritten);
|
|
switch (error) {
|
|
case 0:
|
|
buffer.RemoveElementAt(0);
|
|
break;
|
|
case EAGAIN:
|
|
#if (EAGAIN != EWOULDBLOCK)
|
|
case EWOULDBLOCK:
|
|
#endif
|
|
return true;
|
|
default:
|
|
buffer.RemoveElementAt(0);
|
|
DC_ERROR(("error on sending: %d", error));
|
|
break;
|
|
}
|
|
} while (!buffer.IsEmpty());
|
|
|
|
return false;
|
|
}
|
|
|
|
// Caller must ensure that length <= SIZE_MAX
|
|
void DataChannelConnection::HandleOpenRequestMessage(
|
|
const struct rtcweb_datachannel_open_request* req, uint32_t length,
|
|
uint16_t stream) {
|
|
RefPtr<DataChannel> channel;
|
|
uint32_t prValue;
|
|
uint16_t prPolicy;
|
|
|
|
ASSERT_WEBRTC(!NS_IsMainThread());
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) +
|
|
ntohs(req->protocol_length);
|
|
if (((size_t)length) != requiredLength) {
|
|
if (((size_t)length) < requiredLength) {
|
|
DC_ERROR(
|
|
("%s: insufficient length: %u, should be %zu. Unable to continue.",
|
|
__FUNCTION__, length, requiredLength));
|
|
return;
|
|
}
|
|
DC_WARN(("%s: Inconsistent length: %u, should be %zu", __FUNCTION__, length,
|
|
requiredLength));
|
|
}
|
|
|
|
DC_DEBUG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length,
|
|
sizeof(*req)));
|
|
|
|
switch (req->channel_type) {
|
|
case DATA_CHANNEL_RELIABLE:
|
|
case DATA_CHANNEL_RELIABLE_UNORDERED:
|
|
prPolicy = SCTP_PR_SCTP_NONE;
|
|
break;
|
|
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
|
|
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
|
|
prPolicy = SCTP_PR_SCTP_RTX;
|
|
break;
|
|
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
|
|
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
|
|
prPolicy = SCTP_PR_SCTP_TTL;
|
|
break;
|
|
default:
|
|
DC_ERROR(("Unknown channel type %d", req->channel_type));
|
|
/* XXX error handling */
|
|
return;
|
|
}
|
|
prValue = ntohl(req->reliability_param);
|
|
bool ordered = !(req->channel_type & 0x80);
|
|
|
|
if ((channel = FindChannelByStream(stream))) {
|
|
if (!channel->mNegotiated) {
|
|
DC_ERROR(
|
|
("HandleOpenRequestMessage: channel for pre-existing stream "
|
|
"%u that was not externally negotiated. JS is lying to us, or "
|
|
"there's an id collision.",
|
|
stream));
|
|
/* XXX: some error handling */
|
|
} else {
|
|
DC_DEBUG(("Open for externally negotiated channel %u", stream));
|
|
// XXX should also check protocol, maybe label
|
|
if (prPolicy != channel->mPrPolicy || prValue != channel->mPrValue ||
|
|
ordered != channel->mOrdered) {
|
|
DC_WARN(
|
|
("external negotiation mismatch with OpenRequest:"
|
|
"channel %u, policy %u/%u, value %u/%u, ordered %d/%d",
|
|
stream, prPolicy, channel->mPrPolicy, prValue, channel->mPrValue,
|
|
static_cast<int>(ordered), static_cast<int>(channel->mOrdered)));
|
|
}
|
|
}
|
|
return;
|
|
}
|
|
if (stream >= mNegotiatedIdLimit) {
|
|
DC_ERROR(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream,
|
|
mNegotiatedIdLimit));
|
|
return;
|
|
}
|
|
|
|
nsCString label(
|
|
nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
|
|
nsCString protocol(nsDependentCSubstring(
|
|
&req->label[ntohs(req->label_length)], ntohs(req->protocol_length)));
|
|
|
|
channel =
|
|
new DataChannel(this, stream, DataChannel::OPEN, label, protocol,
|
|
prPolicy, prValue, ordered, false, nullptr, nullptr);
|
|
mChannels.Insert(channel);
|
|
|
|
DC_DEBUG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u", __FUNCTION__,
|
|
channel->mLabel.get(), channel->mProtocol.get(), stream));
|
|
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
|
|
DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, this, channel)));
|
|
|
|
DC_DEBUG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__,
|
|
channel.get()));
|
|
channel->AnnounceOpen();
|
|
|
|
// Note that any message can be buffered; SendOpenAckMessage may
|
|
// error later than this check.
|
|
const auto error = SendOpenAckMessage(channel->mStream);
|
|
if (error) {
|
|
DC_ERROR(("SendOpenRequest failed, error = %d", error));
|
|
Dispatch(NS_NewRunnableFunction(
|
|
"DataChannelConnection::HandleOpenRequestMessage",
|
|
[channel, connection = RefPtr<DataChannelConnection>(this)]() {
|
|
// Close the channel on failure
|
|
connection->Close(channel);
|
|
}));
|
|
return;
|
|
}
|
|
DeliverQueuedData(channel->mStream);
|
|
}
|
|
|
|
// NOTE: the updated spec from the IETF says we should set in-order until we
|
|
// receive an ACK. That would make this code moot. Keep it for now for
|
|
// backwards compatibility.
|
|
void DataChannelConnection::DeliverQueuedData(uint16_t stream) {
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
mQueuedData.RemoveElementsBy([stream, this](const auto& dataItem) {
|
|
mLock.AssertCurrentThreadOwns();
|
|
const bool match = dataItem->mStream == stream;
|
|
if (match) {
|
|
DC_DEBUG(("Delivering queued data for stream %u, length %u", stream,
|
|
dataItem->mLength));
|
|
// Deliver the queued data
|
|
HandleDataMessage(dataItem->mData, dataItem->mLength, dataItem->mPpid,
|
|
dataItem->mStream, dataItem->mFlags);
|
|
}
|
|
return match;
|
|
});
|
|
}
|
|
|
|
// Caller must ensure that length <= SIZE_MAX
|
|
void DataChannelConnection::HandleOpenAckMessage(
|
|
const struct rtcweb_datachannel_ack* ack, uint32_t length,
|
|
uint16_t stream) {
|
|
DataChannel* channel;
|
|
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
channel = FindChannelByStream(stream);
|
|
if (NS_WARN_IF(!channel)) {
|
|
return;
|
|
}
|
|
|
|
DC_DEBUG(("OpenAck received for stream %u, waiting=%d", stream,
|
|
(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
|
|
|
|
channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
|
|
}
|
|
|
|
// Caller must ensure that length <= SIZE_MAX
|
|
void DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length,
|
|
uint16_t stream) {
|
|
/* XXX: Send an error message? */
|
|
DC_ERROR(("unknown DataChannel message received: %u, len %u on stream %d",
|
|
ppid, length, stream));
|
|
// XXX Log to JS error console if possible
|
|
}
|
|
|
|
uint8_t DataChannelConnection::BufferMessage(nsACString& recvBuffer,
|
|
const void* data, uint32_t length,
|
|
uint32_t ppid, int flags) {
|
|
const char* buffer = (const char*)data;
|
|
uint8_t bufferFlags = 0;
|
|
|
|
if ((flags & MSG_EOR) && ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
|
|
ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
|
|
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
|
|
|
|
// Return directly if nothing has been buffered
|
|
if (recvBuffer.IsEmpty()) {
|
|
return bufferFlags;
|
|
}
|
|
}
|
|
|
|
// Ensure it doesn't blow up our buffer
|
|
// TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the
|
|
// new buffer is capable of holding.
|
|
if (((uint64_t)recvBuffer.Length()) + ((uint64_t)length) >
|
|
WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
|
|
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
|
|
return bufferFlags;
|
|
}
|
|
|
|
// Copy & add to receive buffer
|
|
recvBuffer.Append(buffer, length);
|
|
bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
|
|
return bufferFlags;
|
|
}
|
|
|
|
void DataChannelConnection::HandleDataMessage(const void* data, size_t length,
|
|
uint32_t ppid, uint16_t stream,
|
|
int flags) {
|
|
DataChannel* channel;
|
|
const char* buffer = (const char*)data;
|
|
|
|
mLock.AssertCurrentThreadOwns();
|
|
channel = FindChannelByStream(stream);
|
|
|
|
// Note: Until we support SIZE_MAX sized messages, we need this check
|
|
#if (SIZE_MAX > UINT32_MAX)
|
|
if (length > UINT32_MAX) {
|
|
DC_ERROR(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32
|
|
")",
|
|
length, UINT32_MAX));
|
|
CloseLocked(channel);
|
|
return;
|
|
}
|
|
#endif
|
|
uint32_t data_length = (uint32_t)length;
|
|
|
|
// XXX A closed channel may trip this... check
|
|
// NOTE: the updated spec from the IETF says we should set in-order until we
|
|
// receive an ACK. That would make this code moot. Keep it for now for
|
|
// backwards compatibility.
|
|
if (!channel) {
|
|
// In the updated 0-RTT open case, the sender can send data immediately
|
|
// after Open, and doesn't set the in-order bit (since we don't have a
|
|
// response or ack). Also, with external negotiation, data can come in
|
|
// before we're told about the external negotiation. We need to buffer
|
|
// data until either a) Open comes in, if the ordering get messed up,
|
|
// or b) the app tells us this channel was externally negotiated. When
|
|
// these occur, we deliver the data.
|
|
|
|
// Since this is rare and non-performance, keep a single list of queued
|
|
// data messages to deliver once the channel opens.
|
|
DC_DEBUG(("Queuing data for stream %u, length %u", stream, data_length));
|
|
// Copies data
|
|
mQueuedData.AppendElement(
|
|
new QueuedDataMessage(stream, ppid, flags, data, data_length));
|
|
return;
|
|
}
|
|
|
|
bool is_binary = true;
|
|
uint8_t bufferFlags;
|
|
int32_t type;
|
|
const char* info = "";
|
|
|
|
if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
|
|
ppid == DATA_CHANNEL_PPID_DOMSTRING) {
|
|
is_binary = false;
|
|
}
|
|
if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
|
|
NS_WARNING("DataChannel message aborted by fragment type change!");
|
|
// TODO: Maybe closing would be better as this is a hard to detect protocol
|
|
// violation?
|
|
channel->mRecvBuffer.Truncate(0);
|
|
}
|
|
channel->mIsRecvBinary = is_binary;
|
|
|
|
// Remaining chunks of previously truncated message (due to the buffer being
|
|
// full)?
|
|
if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
|
|
DC_ERROR(
|
|
("DataChannel: Ignoring partial message of length %u, buffer full and "
|
|
"closing",
|
|
data_length));
|
|
// Only unblock if unordered
|
|
if (!channel->mOrdered && (flags & MSG_EOR)) {
|
|
channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
|
|
}
|
|
}
|
|
|
|
// Buffer message until complete
|
|
bufferFlags =
|
|
BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
|
|
DC_ERROR(
|
|
("DataChannel: Buffered message would become too large to handle, "
|
|
"closing channel"));
|
|
channel->mRecvBuffer.Truncate(0);
|
|
channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
|
|
CloseLocked(channel);
|
|
return;
|
|
}
|
|
if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
|
|
DC_DEBUG(
|
|
("DataChannel: Partial %s message of length %u (total %zu) on channel "
|
|
"id %u",
|
|
is_binary ? "binary" : "string", data_length,
|
|
channel->mRecvBuffer.Length(), channel->mStream));
|
|
return; // Not ready to notify application
|
|
}
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
|
|
data_length = channel->mRecvBuffer.Length();
|
|
}
|
|
|
|
// Complain about large messages (only complain - we can handle it)
|
|
if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
|
|
DC_WARN(
|
|
("DataChannel: Received message of length %u is > announced maximum "
|
|
"message size (%u)",
|
|
data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
|
|
}
|
|
|
|
switch (ppid) {
|
|
case DATA_CHANNEL_PPID_DOMSTRING:
|
|
DC_DEBUG(
|
|
("DataChannel: Received string message of length %u on channel %u",
|
|
data_length, channel->mStream));
|
|
type = DataChannelOnMessageAvailable::ON_DATA_STRING;
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
|
|
info = " (string fragmented)";
|
|
}
|
|
// else send using recvData normally
|
|
|
|
// WebSockets checks IsUTF8() here; we can try to deliver it
|
|
break;
|
|
|
|
case DATA_CHANNEL_PPID_BINARY:
|
|
DC_DEBUG(
|
|
("DataChannel: Received binary message of length %u on channel id %u",
|
|
data_length, channel->mStream));
|
|
type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
|
|
info = " (binary fragmented)";
|
|
}
|
|
|
|
// else send using recvData normally
|
|
break;
|
|
|
|
default:
|
|
NS_ERROR("Unknown data PPID");
|
|
DC_ERROR(("Unknown data PPID %" PRIu32, ppid));
|
|
return;
|
|
}
|
|
|
|
channel->WithTrafficCounters(
|
|
[&data_length](DataChannel::TrafficCounters& counters) {
|
|
counters.mMessagesReceived++;
|
|
counters.mBytesReceived += data_length;
|
|
});
|
|
|
|
// Notify onmessage
|
|
DC_DEBUG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
|
|
(type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING"
|
|
: "BINARY",
|
|
info, channel));
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
|
|
channel->SendOrQueue(new DataChannelOnMessageAvailable(
|
|
type, this, channel, channel->mRecvBuffer));
|
|
channel->mRecvBuffer.Truncate(0);
|
|
} else {
|
|
nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
|
|
channel->SendOrQueue(
|
|
new DataChannelOnMessageAvailable(type, this, channel, recvData));
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandleDCEPMessage(const void* buffer, size_t length,
|
|
uint32_t ppid, uint16_t stream,
|
|
int flags) {
|
|
const struct rtcweb_datachannel_open_request* req;
|
|
const struct rtcweb_datachannel_ack* ack;
|
|
|
|
// Note: Until we support SIZE_MAX sized messages, we need this check
|
|
#if (SIZE_MAX > UINT32_MAX)
|
|
if (length > UINT32_MAX) {
|
|
DC_ERROR(("DataChannel: Cannot handle message of size %zu (max=%u)", length,
|
|
UINT32_MAX));
|
|
Stop();
|
|
return;
|
|
}
|
|
#endif
|
|
uint32_t data_length = (uint32_t)length;
|
|
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
// Buffer message until complete
|
|
const uint8_t bufferFlags =
|
|
BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
|
|
DC_ERROR(
|
|
("DataChannel: Buffered message would become too large to handle, "
|
|
"closing connection"));
|
|
mRecvBuffer.Truncate(0);
|
|
Stop();
|
|
return;
|
|
}
|
|
if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
|
|
DC_DEBUG(("Buffered partial DCEP message of length %u", data_length));
|
|
return;
|
|
}
|
|
if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
|
|
buffer = reinterpret_cast<const void*>(mRecvBuffer.BeginReading());
|
|
data_length = mRecvBuffer.Length();
|
|
}
|
|
|
|
req = static_cast<const struct rtcweb_datachannel_open_request*>(buffer);
|
|
DC_DEBUG(("Handling DCEP message of length %u", data_length));
|
|
|
|
// Ensure minimum message size (ack is the smallest DCEP message)
|
|
if ((size_t)data_length < sizeof(*ack)) {
|
|
DC_WARN(("Ignored invalid DCEP message (too short)"));
|
|
return;
|
|
}
|
|
|
|
switch (req->msg_type) {
|
|
case DATA_CHANNEL_OPEN_REQUEST:
|
|
// structure includes a possibly-unused char label[1] (in a packed
|
|
// structure)
|
|
if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
|
|
return;
|
|
}
|
|
|
|
HandleOpenRequestMessage(req, data_length, stream);
|
|
break;
|
|
case DATA_CHANNEL_ACK:
|
|
// >= sizeof(*ack) checked above
|
|
|
|
ack = static_cast<const struct rtcweb_datachannel_ack*>(buffer);
|
|
HandleOpenAckMessage(ack, data_length, stream);
|
|
break;
|
|
default:
|
|
HandleUnknownMessage(ppid, data_length, stream);
|
|
break;
|
|
}
|
|
|
|
// Reset buffer
|
|
mRecvBuffer.Truncate(0);
|
|
}
|
|
|
|
// Called with mLock locked!
|
|
void DataChannelConnection::HandleMessage(const void* buffer, size_t length,
|
|
uint32_t ppid, uint16_t stream,
|
|
int flags) {
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
switch (ppid) {
|
|
case DATA_CHANNEL_PPID_CONTROL:
|
|
HandleDCEPMessage(buffer, length, ppid, stream, flags);
|
|
break;
|
|
case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
|
|
case DATA_CHANNEL_PPID_DOMSTRING:
|
|
case DATA_CHANNEL_PPID_BINARY_PARTIAL:
|
|
case DATA_CHANNEL_PPID_BINARY:
|
|
HandleDataMessage(buffer, length, ppid, stream, flags);
|
|
break;
|
|
default:
|
|
DC_ERROR((
|
|
"Unhandled message of length %zu PPID %u on stream %u received (%s).",
|
|
length, ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
|
|
break;
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandleAssociationChangeEvent(
|
|
const struct sctp_assoc_change* sac) {
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
uint32_t i, n;
|
|
const auto readyState = GetReadyState();
|
|
switch (sac->sac_state) {
|
|
case SCTP_COMM_UP:
|
|
DC_DEBUG(("Association change: SCTP_COMM_UP"));
|
|
if (readyState == CONNECTING) {
|
|
mSocket = mMasterSocket;
|
|
SetReadyState(OPEN);
|
|
|
|
DC_DEBUG(("Negotiated number of incoming streams: %" PRIu16,
|
|
sac->sac_inbound_streams));
|
|
DC_DEBUG(("Negotiated number of outgoing streams: %" PRIu16,
|
|
sac->sac_outbound_streams));
|
|
mNegotiatedIdLimit =
|
|
std::max(mNegotiatedIdLimit,
|
|
static_cast<size_t>(std::max(sac->sac_outbound_streams,
|
|
sac->sac_inbound_streams)));
|
|
|
|
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
|
|
DataChannelOnMessageAvailable::ON_CONNECTION, this)));
|
|
DC_DEBUG(("DTLS connect() succeeded! Entering connected mode"));
|
|
|
|
// Open any streams pending...
|
|
ProcessQueuedOpens();
|
|
|
|
} else if (readyState == OPEN) {
|
|
DC_DEBUG(("DataConnection Already OPEN"));
|
|
} else {
|
|
DC_ERROR(("Unexpected state: %d", readyState));
|
|
}
|
|
break;
|
|
case SCTP_COMM_LOST:
|
|
DC_DEBUG(("Association change: SCTP_COMM_LOST"));
|
|
// This association is toast, so also close all the channels -- from
|
|
// mainthread!
|
|
Stop();
|
|
break;
|
|
case SCTP_RESTART:
|
|
DC_DEBUG(("Association change: SCTP_RESTART"));
|
|
break;
|
|
case SCTP_SHUTDOWN_COMP:
|
|
DC_DEBUG(("Association change: SCTP_SHUTDOWN_COMP"));
|
|
Stop();
|
|
break;
|
|
case SCTP_CANT_STR_ASSOC:
|
|
DC_DEBUG(("Association change: SCTP_CANT_STR_ASSOC"));
|
|
break;
|
|
default:
|
|
DC_DEBUG(("Association change: UNKNOWN"));
|
|
break;
|
|
}
|
|
DC_DEBUG(("Association change: streams (in/out) = (%u/%u)",
|
|
sac->sac_inbound_streams, sac->sac_outbound_streams));
|
|
|
|
if (NS_WARN_IF(!sac)) {
|
|
return;
|
|
}
|
|
|
|
n = sac->sac_length - sizeof(*sac);
|
|
if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
|
|
if (n > 0) {
|
|
for (i = 0; i < n; ++i) {
|
|
switch (sac->sac_info[i]) {
|
|
case SCTP_ASSOC_SUPPORTS_PR:
|
|
DC_DEBUG(("Supports: PR"));
|
|
break;
|
|
case SCTP_ASSOC_SUPPORTS_AUTH:
|
|
DC_DEBUG(("Supports: AUTH"));
|
|
break;
|
|
case SCTP_ASSOC_SUPPORTS_ASCONF:
|
|
DC_DEBUG(("Supports: ASCONF"));
|
|
break;
|
|
case SCTP_ASSOC_SUPPORTS_MULTIBUF:
|
|
DC_DEBUG(("Supports: MULTIBUF"));
|
|
break;
|
|
case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
|
|
DC_DEBUG(("Supports: RE-CONFIG"));
|
|
break;
|
|
#if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
|
|
case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
|
|
DC_DEBUG(("Supports: NDATA"));
|
|
// TODO: This should probably be set earlier above in 'case
|
|
// SCTP_COMM_UP' but we also need this for 'SCTP_RESTART'.
|
|
mSendInterleaved = true;
|
|
break;
|
|
#endif
|
|
default:
|
|
DC_ERROR(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
} else if (((sac->sac_state == SCTP_COMM_LOST) ||
|
|
(sac->sac_state == SCTP_CANT_STR_ASSOC)) &&
|
|
(n > 0)) {
|
|
DC_DEBUG(("Association: ABORT ="));
|
|
for (i = 0; i < n; ++i) {
|
|
DC_DEBUG((" 0x%02x", sac->sac_info[i]));
|
|
}
|
|
}
|
|
if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
|
|
(sac->sac_state == SCTP_SHUTDOWN_COMP) ||
|
|
(sac->sac_state == SCTP_COMM_LOST)) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandlePeerAddressChangeEvent(
|
|
const struct sctp_paddr_change* spc) {
|
|
const char* addr = "";
|
|
#if !defined(__Userspace_os_Windows)
|
|
char addr_buf[INET6_ADDRSTRLEN];
|
|
struct sockaddr_in* sin;
|
|
struct sockaddr_in6* sin6;
|
|
#endif
|
|
|
|
switch (spc->spc_aaddr.ss_family) {
|
|
case AF_INET:
|
|
#if !defined(__Userspace_os_Windows)
|
|
sin = (struct sockaddr_in*)&spc->spc_aaddr;
|
|
addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
|
|
#endif
|
|
break;
|
|
case AF_INET6:
|
|
#if !defined(__Userspace_os_Windows)
|
|
sin6 = (struct sockaddr_in6*)&spc->spc_aaddr;
|
|
addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
|
|
#endif
|
|
break;
|
|
case AF_CONN:
|
|
addr = "DTLS connection";
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
DC_DEBUG(("Peer address %s is now ", addr));
|
|
switch (spc->spc_state) {
|
|
case SCTP_ADDR_AVAILABLE:
|
|
DC_DEBUG(("SCTP_ADDR_AVAILABLE"));
|
|
break;
|
|
case SCTP_ADDR_UNREACHABLE:
|
|
DC_DEBUG(("SCTP_ADDR_UNREACHABLE"));
|
|
break;
|
|
case SCTP_ADDR_REMOVED:
|
|
DC_DEBUG(("SCTP_ADDR_REMOVED"));
|
|
break;
|
|
case SCTP_ADDR_ADDED:
|
|
DC_DEBUG(("SCTP_ADDR_ADDED"));
|
|
break;
|
|
case SCTP_ADDR_MADE_PRIM:
|
|
DC_DEBUG(("SCTP_ADDR_MADE_PRIM"));
|
|
break;
|
|
case SCTP_ADDR_CONFIRMED:
|
|
DC_DEBUG(("SCTP_ADDR_CONFIRMED"));
|
|
break;
|
|
default:
|
|
DC_ERROR(("UNKNOWN SCP STATE"));
|
|
break;
|
|
}
|
|
if (spc->spc_error) {
|
|
DC_ERROR((" (error = 0x%08x).\n", spc->spc_error));
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandleRemoteErrorEvent(
|
|
const struct sctp_remote_error* sre) {
|
|
size_t i, n;
|
|
|
|
n = sre->sre_length - sizeof(struct sctp_remote_error);
|
|
DC_WARN(("Remote Error (error = 0x%04x): ", sre->sre_error));
|
|
for (i = 0; i < n; ++i) {
|
|
DC_WARN((" 0x%02x", sre->sre_data[i]));
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandleShutdownEvent(
|
|
const struct sctp_shutdown_event* sse) {
|
|
DC_DEBUG(("Shutdown event."));
|
|
/* XXX: notify all channels. */
|
|
// Attempts to actually send anything will fail
|
|
}
|
|
|
|
void DataChannelConnection::HandleAdaptationIndication(
|
|
const struct sctp_adaptation_event* sai) {
|
|
DC_DEBUG(("Adaptation indication: %x.", sai->sai_adaptation_ind));
|
|
}
|
|
|
|
void DataChannelConnection::HandlePartialDeliveryEvent(
|
|
const struct sctp_pdapi_event* spde) {
|
|
// Note: Be aware that stream and sequence number being u32 instead of u16 is
|
|
// a bug in the SCTP API. This may change in the future.
|
|
|
|
DC_DEBUG(("Partial delivery event: "));
|
|
switch (spde->pdapi_indication) {
|
|
case SCTP_PARTIAL_DELIVERY_ABORTED:
|
|
DC_DEBUG(("delivery aborted "));
|
|
break;
|
|
default:
|
|
DC_ERROR(("??? "));
|
|
break;
|
|
}
|
|
DC_DEBUG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32,
|
|
spde->pdapi_flags, spde->pdapi_stream, spde->pdapi_seq));
|
|
|
|
// Validate stream ID
|
|
if (spde->pdapi_stream >= UINT16_MAX) {
|
|
DC_ERROR(("Invalid stream id in partial delivery event: %" PRIu32 "\n",
|
|
spde->pdapi_stream));
|
|
return;
|
|
}
|
|
|
|
// Find channel and reset buffer
|
|
DataChannel* channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
|
|
if (channel) {
|
|
DC_WARN(("Abort partially delivered message of %zu bytes\n",
|
|
channel->mRecvBuffer.Length()));
|
|
channel->mRecvBuffer.Truncate(0);
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandleSendFailedEvent(
|
|
const struct sctp_send_failed_event* ssfe) {
|
|
size_t i, n;
|
|
|
|
if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
|
|
DC_DEBUG(("Unsent "));
|
|
}
|
|
if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
|
|
DC_DEBUG(("Sent "));
|
|
}
|
|
if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
|
|
DC_DEBUG(("(flags = %x) ", ssfe->ssfe_flags));
|
|
}
|
|
#ifdef XP_WIN
|
|
# define PRIPPID "lu"
|
|
#else
|
|
# define PRIPPID "u"
|
|
#endif
|
|
DC_DEBUG(("message with PPID = %" PRIPPID
|
|
", SID = %d, flags: 0x%04x due to error = 0x%08x",
|
|
ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
|
|
ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
|
|
#undef PRIPPID
|
|
n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
|
|
for (i = 0; i < n; ++i) {
|
|
DC_DEBUG((" 0x%02x", ssfe->ssfe_data[i]));
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::ClearResets() {
|
|
// Clear all pending resets
|
|
if (!mStreamsResetting.IsEmpty()) {
|
|
DC_DEBUG(("Clearing resets for %zu streams", mStreamsResetting.Length()));
|
|
}
|
|
|
|
for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
|
|
RefPtr<DataChannel> channel;
|
|
channel = FindChannelByStream(mStreamsResetting[i]);
|
|
if (channel) {
|
|
DC_DEBUG(("Forgetting channel %u (%p) with pending reset",
|
|
channel->mStream, channel.get()));
|
|
// TODO: Do we _really_ want to remove this? Are we allowed to reuse the
|
|
// id?
|
|
mChannels.Remove(channel);
|
|
}
|
|
}
|
|
mStreamsResetting.Clear();
|
|
}
|
|
|
|
void DataChannelConnection::ResetOutgoingStream(uint16_t stream) {
|
|
uint32_t i;
|
|
|
|
mLock.AssertCurrentThreadOwns();
|
|
DC_DEBUG(
|
|
("Connection %p: Resetting outgoing stream %u", (void*)this, stream));
|
|
// Rarely has more than a couple items and only for a short time
|
|
for (i = 0; i < mStreamsResetting.Length(); ++i) {
|
|
if (mStreamsResetting[i] == stream) {
|
|
return;
|
|
}
|
|
}
|
|
mStreamsResetting.AppendElement(stream);
|
|
}
|
|
|
|
void DataChannelConnection::SendOutgoingStreamReset() {
|
|
struct sctp_reset_streams* srs;
|
|
uint32_t i;
|
|
size_t len;
|
|
|
|
DC_DEBUG(("Connection %p: Sending outgoing stream reset for %zu streams",
|
|
(void*)this, mStreamsResetting.Length()));
|
|
mLock.AssertCurrentThreadOwns();
|
|
if (mStreamsResetting.IsEmpty()) {
|
|
DC_DEBUG(("No streams to reset"));
|
|
return;
|
|
}
|
|
len = sizeof(sctp_assoc_t) +
|
|
(2 + mStreamsResetting.Length()) * sizeof(uint16_t);
|
|
srs = static_cast<struct sctp_reset_streams*>(
|
|
moz_xmalloc(len)); // infallible malloc
|
|
memset(srs, 0, len);
|
|
srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
|
|
srs->srs_number_streams = mStreamsResetting.Length();
|
|
for (i = 0; i < mStreamsResetting.Length(); ++i) {
|
|
srs->srs_stream_list[i] = mStreamsResetting[i];
|
|
}
|
|
if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs,
|
|
(socklen_t)len) < 0) {
|
|
DC_ERROR(("***failed: setsockopt RESET, errno %d", errno));
|
|
// if errno == EALREADY, this is normal - we can't send another reset
|
|
// with one pending.
|
|
// When we get an incoming reset (which may be a response to our
|
|
// outstanding one), see if we have any pending outgoing resets and
|
|
// send them
|
|
} else {
|
|
mStreamsResetting.Clear();
|
|
}
|
|
free(srs);
|
|
}
|
|
|
|
void DataChannelConnection::HandleStreamResetEvent(
|
|
const struct sctp_stream_reset_event* strrst) {
|
|
uint32_t n, i;
|
|
RefPtr<DataChannel> channel; // since we may null out the ref to the channel
|
|
|
|
if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
|
|
!(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
|
|
n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) /
|
|
sizeof(uint16_t);
|
|
for (i = 0; i < n; ++i) {
|
|
if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
|
|
channel = FindChannelByStream(strrst->strreset_stream_list[i]);
|
|
if (channel) {
|
|
// The other side closed the channel
|
|
// We could be in three states:
|
|
// 1. Normal state (input and output streams (OPEN)
|
|
// Notify application, send a RESET in response on our
|
|
// outbound channel. Go to CLOSED
|
|
// 2. We sent our own reset (CLOSING); either they crossed on the
|
|
// wire, or this is a response to our Reset.
|
|
// Go to CLOSED
|
|
// 3. We've sent a open but haven't gotten a response yet (CONNECTING)
|
|
// I believe this is impossible, as we don't have an input stream
|
|
// yet.
|
|
|
|
DC_DEBUG(("Incoming: Channel %u closed", channel->mStream));
|
|
if (mChannels.Remove(channel)) {
|
|
// Mark the stream for reset (the reset is sent below)
|
|
ResetOutgoingStream(channel->mStream);
|
|
}
|
|
|
|
DC_DEBUG(("Disconnected DataChannel %p from connection %p",
|
|
(void*)channel.get(), (void*)channel->mConnection.get()));
|
|
channel->StreamClosedLocked();
|
|
} else {
|
|
DC_WARN(("Can't find incoming channel %d", i));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Process any pending resets now:
|
|
if (!mStreamsResetting.IsEmpty()) {
|
|
DC_DEBUG(("Sending %zu pending resets", mStreamsResetting.Length()));
|
|
SendOutgoingStreamReset();
|
|
}
|
|
}
|
|
|
|
void DataChannelConnection::HandleStreamChangeEvent(
|
|
const struct sctp_stream_change_event* strchg) {
|
|
ASSERT_WEBRTC(!NS_IsMainThread());
|
|
if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
|
|
DC_ERROR(("*** Failed increasing number of streams from %zu (%u/%u)",
|
|
mNegotiatedIdLimit, strchg->strchange_instrms,
|
|
strchg->strchange_outstrms));
|
|
// XXX FIX! notify pending opens of failure
|
|
return;
|
|
}
|
|
if (strchg->strchange_instrms > mNegotiatedIdLimit) {
|
|
DC_DEBUG(("Other side increased streams from %zu to %u", mNegotiatedIdLimit,
|
|
strchg->strchange_instrms));
|
|
}
|
|
uint16_t old_limit = mNegotiatedIdLimit;
|
|
uint16_t new_limit =
|
|
std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
|
|
if (new_limit > mNegotiatedIdLimit) {
|
|
DC_DEBUG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
|
|
old_limit, new_limit, new_limit - old_limit,
|
|
strchg->strchange_instrms));
|
|
// make sure both are the same length
|
|
mNegotiatedIdLimit = new_limit;
|
|
DC_DEBUG(("New length = %zu (was %d)", mNegotiatedIdLimit, old_limit));
|
|
// Re-process any channels waiting for streams.
|
|
// Linear search, but we don't increase channels often and
|
|
// the array would only get long in case of an app error normally
|
|
|
|
// Make sure we request enough streams if there's a big jump in streams
|
|
// Could make a more complex API for OpenXxxFinish() and avoid this loop
|
|
auto channels = mChannels.GetAll();
|
|
size_t num_needed =
|
|
channels.Length() ? (channels.LastElement()->mStream + 1) : 0;
|
|
MOZ_ASSERT(num_needed != INVALID_STREAM);
|
|
if (num_needed > new_limit) {
|
|
int32_t more_needed = num_needed - ((int32_t)mNegotiatedIdLimit) + 16;
|
|
DC_DEBUG(("Not enough new streams, asking for %d more", more_needed));
|
|
// TODO: parameter is an int32_t but we pass size_t
|
|
RequestMoreStreams(more_needed);
|
|
} else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
|
|
DC_DEBUG(("Requesting %d output streams to match partner",
|
|
strchg->strchange_instrms - strchg->strchange_outstrms));
|
|
RequestMoreStreams(strchg->strchange_instrms -
|
|
strchg->strchange_outstrms);
|
|
}
|
|
|
|
ProcessQueuedOpens();
|
|
}
|
|
// else probably not a change in # of streams
|
|
|
|
if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
|
|
(strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
|
|
// Other side denied our request. Need to AnnounceClosed some stuff.
|
|
for (auto& channel : mChannels.GetAll()) {
|
|
if (channel->mStream >= mNegotiatedIdLimit) {
|
|
/* XXX: Signal to the other end. */
|
|
channel->AnnounceClosed();
|
|
// maybe fire onError (bug 843625)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Called with mLock locked!
|
|
void DataChannelConnection::HandleNotification(
|
|
const union sctp_notification* notif, size_t n) {
|
|
mLock.AssertCurrentThreadOwns();
|
|
if (notif->sn_header.sn_length != (uint32_t)n) {
|
|
return;
|
|
}
|
|
switch (notif->sn_header.sn_type) {
|
|
case SCTP_ASSOC_CHANGE:
|
|
HandleAssociationChangeEvent(&(notif->sn_assoc_change));
|
|
break;
|
|
case SCTP_PEER_ADDR_CHANGE:
|
|
HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
|
|
break;
|
|
case SCTP_REMOTE_ERROR:
|
|
HandleRemoteErrorEvent(&(notif->sn_remote_error));
|
|
break;
|
|
case SCTP_SHUTDOWN_EVENT:
|
|
HandleShutdownEvent(&(notif->sn_shutdown_event));
|
|
break;
|
|
case SCTP_ADAPTATION_INDICATION:
|
|
HandleAdaptationIndication(&(notif->sn_adaptation_event));
|
|
break;
|
|
case SCTP_AUTHENTICATION_EVENT:
|
|
DC_DEBUG(("SCTP_AUTHENTICATION_EVENT"));
|
|
break;
|
|
case SCTP_SENDER_DRY_EVENT:
|
|
// DC_DEBUG(("SCTP_SENDER_DRY_EVENT"));
|
|
break;
|
|
case SCTP_NOTIFICATIONS_STOPPED_EVENT:
|
|
DC_DEBUG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
|
|
break;
|
|
case SCTP_PARTIAL_DELIVERY_EVENT:
|
|
HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
|
|
break;
|
|
case SCTP_SEND_FAILED_EVENT:
|
|
HandleSendFailedEvent(&(notif->sn_send_failed_event));
|
|
break;
|
|
case SCTP_STREAM_RESET_EVENT:
|
|
HandleStreamResetEvent(&(notif->sn_strreset_event));
|
|
break;
|
|
case SCTP_ASSOC_RESET_EVENT:
|
|
DC_DEBUG(("SCTP_ASSOC_RESET_EVENT"));
|
|
break;
|
|
case SCTP_STREAM_CHANGE_EVENT:
|
|
HandleStreamChangeEvent(&(notif->sn_strchange_event));
|
|
break;
|
|
default:
|
|
DC_ERROR(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
|
|
break;
|
|
}
|
|
}
|
|
|
|
int DataChannelConnection::ReceiveCallback(
|
|
struct socket* sock, void* data, size_t datalen, struct sctp_rcvinfo rcv,
|
|
int flags) NO_THREAD_SAFETY_ANALYSIS {
|
|
ASSERT_WEBRTC(!NS_IsMainThread());
|
|
DC_DEBUG(("In ReceiveCallback"));
|
|
|
|
if (!data) {
|
|
DC_DEBUG(("ReceiveCallback: SCTP has finished shutting down"));
|
|
} else {
|
|
bool locked = false;
|
|
if (!IsSTSThread()) {
|
|
mLock.Lock();
|
|
locked = true;
|
|
} else {
|
|
mLock.AssertCurrentThreadOwns();
|
|
}
|
|
if (flags & MSG_NOTIFICATION) {
|
|
HandleNotification(static_cast<union sctp_notification*>(data), datalen);
|
|
} else {
|
|
HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
|
|
}
|
|
if (locked) {
|
|
mLock.Unlock();
|
|
}
|
|
// sctp allocates 'data' with malloc(), and expects the receiver to free
|
|
// it (presumably with free).
|
|
// XXX future optimization: try to deliver messages without an internal
|
|
// alloc/copy, and if so delay the free until later.
|
|
free(data);
|
|
}
|
|
// usrsctp defines the callback as returning an int, but doesn't use it
|
|
return 1;
|
|
}
|
|
|
|
already_AddRefed<DataChannel> DataChannelConnection::Open(
|
|
const nsACString& label, const nsACString& protocol, Type type,
|
|
bool inOrder, uint32_t prValue, DataChannelListener* aListener,
|
|
nsISupports* aContext, bool aExternalNegotiated, uint16_t aStream) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
MutexAutoLock lock(mLock); // OpenFinish assumes this
|
|
if (!aExternalNegotiated) {
|
|
if (mAllocateEven.isSome()) {
|
|
aStream = FindFreeStream();
|
|
if (aStream == INVALID_STREAM) {
|
|
return nullptr;
|
|
}
|
|
} else {
|
|
// We do not yet know whether we are client or server, and an id has not
|
|
// been chosen for us. We will need to choose later.
|
|
aStream = INVALID_STREAM;
|
|
}
|
|
}
|
|
uint16_t prPolicy = SCTP_PR_SCTP_NONE;
|
|
|
|
DC_DEBUG(
|
|
("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, "
|
|
"context %p, external: %s, stream %u",
|
|
PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
|
|
type, inOrder, prValue, aListener, aContext,
|
|
aExternalNegotiated ? "true" : "false", aStream));
|
|
switch (type) {
|
|
case DATA_CHANNEL_RELIABLE:
|
|
prPolicy = SCTP_PR_SCTP_NONE;
|
|
break;
|
|
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
|
|
prPolicy = SCTP_PR_SCTP_RTX;
|
|
break;
|
|
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
|
|
prPolicy = SCTP_PR_SCTP_TTL;
|
|
break;
|
|
default:
|
|
DC_ERROR(("unsupported channel type: %u", type));
|
|
MOZ_ASSERT(false);
|
|
return nullptr;
|
|
}
|
|
if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
|
|
return nullptr;
|
|
}
|
|
|
|
if (aStream != INVALID_STREAM && mChannels.Get(aStream)) {
|
|
DC_ERROR(("external negotiation of already-open channel %u", aStream));
|
|
return nullptr;
|
|
}
|
|
|
|
RefPtr<DataChannel> channel(new DataChannel(
|
|
this, aStream, DataChannel::CONNECTING, label, protocol, prPolicy,
|
|
prValue, inOrder, aExternalNegotiated, aListener, aContext));
|
|
mChannels.Insert(channel);
|
|
|
|
return OpenFinish(channel.forget());
|
|
}
|
|
|
|
// Separate routine so we can also call it to finish up from pending opens
|
|
already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
|
|
already_AddRefed<DataChannel>&& aChannel) {
|
|
RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
|
|
// Normally 1 reference if called from ::Open(), or 2 if called from
|
|
// ProcessQueuedOpens() unless the DOMDataChannel was gc'd
|
|
const uint16_t stream = channel->mStream;
|
|
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
// Cases we care about:
|
|
// Pre-negotiated:
|
|
// Not Open:
|
|
// Doesn't fit:
|
|
// -> change initial ask or renegotiate after open
|
|
// -> queue open
|
|
// Open:
|
|
// Doesn't fit:
|
|
// -> RequestMoreStreams && queue
|
|
// Does fit:
|
|
// -> open
|
|
// Not negotiated:
|
|
// Not Open:
|
|
// -> queue open
|
|
// Open:
|
|
// -> Try to get a stream
|
|
// Doesn't fit:
|
|
// -> RequestMoreStreams && queue
|
|
// Does fit:
|
|
// -> open
|
|
// So the Open cases are basically the same
|
|
// Not Open cases are simply queue for non-negotiated, and
|
|
// either change the initial ask or possibly renegotiate after open.
|
|
const auto readyState = GetReadyState();
|
|
if (readyState != OPEN || stream >= mNegotiatedIdLimit) {
|
|
if (readyState == OPEN) {
|
|
MOZ_ASSERT(stream != INVALID_STREAM);
|
|
// RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra
|
|
// streams to avoid going back immediately for more if the ask to N, N+1,
|
|
// etc
|
|
int32_t more_needed = stream - ((int32_t)mNegotiatedIdLimit) + 16;
|
|
if (!RequestMoreStreams(more_needed)) {
|
|
// Something bad happened... we're done
|
|
goto request_error_cleanup;
|
|
}
|
|
}
|
|
DC_DEBUG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
|
|
// Also serves to mark we told the app
|
|
channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
|
|
mPending.Push(channel);
|
|
return channel.forget();
|
|
}
|
|
|
|
MOZ_ASSERT(stream != INVALID_STREAM);
|
|
MOZ_ASSERT(stream < mNegotiatedIdLimit);
|
|
|
|
#ifdef TEST_QUEUED_DATA
|
|
// It's painful to write a test for this...
|
|
channel->AnnounceOpen();
|
|
SendDataMsgInternalOrBuffer(channel, "Help me!", 8,
|
|
DATA_CHANNEL_PPID_DOMSTRING);
|
|
#endif
|
|
|
|
if (!channel->mOrdered) {
|
|
// Don't send unordered until this gets cleared
|
|
channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
|
|
}
|
|
|
|
if (!channel->mNegotiated) {
|
|
int error = SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
|
|
stream, !channel->mOrdered,
|
|
channel->mPrPolicy, channel->mPrValue);
|
|
if (error) {
|
|
DC_ERROR(("SendOpenRequest failed, error = %d", error));
|
|
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
|
|
// We already returned the channel to the app.
|
|
NS_ERROR("Failed to send open request");
|
|
channel->AnnounceClosed();
|
|
}
|
|
// If we haven't returned the channel yet, it will get destroyed when we
|
|
// exit this function.
|
|
mChannels.Remove(channel);
|
|
// we'll be destroying the channel
|
|
return nullptr;
|
|
/* NOTREACHED */
|
|
}
|
|
}
|
|
|
|
// Either externally negotiated or we sent Open
|
|
// FIX? Move into DOMDataChannel? I don't think we can send it yet here
|
|
channel->AnnounceOpen();
|
|
|
|
return channel.forget();
|
|
|
|
request_error_cleanup:
|
|
if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
|
|
// We already returned the channel to the app.
|
|
NS_ERROR("Failed to request more streams");
|
|
channel->AnnounceClosed();
|
|
return channel.forget();
|
|
}
|
|
// we'll be destroying the channel, but it never really got set up
|
|
// Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
|
|
// Dispatch it to ourselves
|
|
return nullptr;
|
|
}
|
|
|
|
// Requires mLock to be locked!
|
|
// Returns a POSIX error code directly instead of setting errno.
|
|
int DataChannelConnection::SendMsgInternal(OutgoingMsg& msg, size_t* aWritten) {
|
|
auto& info = msg.GetInfo().sendv_sndinfo;
|
|
int error;
|
|
|
|
// EOR set?
|
|
bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
|
|
|
|
// Send until buffer is empty
|
|
size_t left = msg.GetLeft();
|
|
do {
|
|
size_t length;
|
|
|
|
// Carefully chunk the buffer
|
|
if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
|
|
length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
|
|
|
|
// Unset EOR flag
|
|
info.snd_flags &= ~SCTP_EOR;
|
|
} else {
|
|
length = left;
|
|
|
|
// Set EOR flag
|
|
if (eor_set) {
|
|
info.snd_flags |= SCTP_EOR;
|
|
}
|
|
}
|
|
|
|
// Send (or try at least)
|
|
// SCTP will return EMSGSIZE if the message is bigger than the buffer
|
|
// size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
|
|
// by carefully crafting small enough message chunks.
|
|
ssize_t written = usrsctp_sendv(
|
|
mSocket, msg.GetData(), length, nullptr, 0, (void*)&msg.GetInfo(),
|
|
(socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
|
|
|
|
if (written < 0) {
|
|
error = errno;
|
|
goto out;
|
|
}
|
|
|
|
if (aWritten) {
|
|
*aWritten += written;
|
|
}
|
|
DC_DEBUG(("Sent buffer (written=%zu, len=%zu, left=%zu)", (size_t)written,
|
|
length, left - (size_t)written));
|
|
|
|
// TODO: Remove once resolved
|
|
// (https://github.com/sctplab/usrsctp/issues/132)
|
|
if (written == 0) {
|
|
DC_ERROR(("@tuexen: usrsctp_sendv returned 0"));
|
|
error = EAGAIN;
|
|
goto out;
|
|
}
|
|
|
|
// If not all bytes have been written, this obviously means that usrsctp's
|
|
// buffer is full and we need to try again later.
|
|
if ((size_t)written < length) {
|
|
msg.Advance((size_t)written);
|
|
error = EAGAIN;
|
|
goto out;
|
|
}
|
|
|
|
// Update buffer position
|
|
msg.Advance((size_t)written);
|
|
|
|
// Get amount of bytes left in the buffer
|
|
left = msg.GetLeft();
|
|
} while (left > 0);
|
|
|
|
// Done
|
|
error = 0;
|
|
|
|
out:
|
|
// Reset EOR flag
|
|
if (eor_set) {
|
|
info.snd_flags |= SCTP_EOR;
|
|
}
|
|
|
|
return error;
|
|
}
|
|
|
|
// Requires mLock to be locked!
|
|
// Returns a POSIX error code directly instead of setting errno.
|
|
// IMPORTANT: Ensure that the buffer passed is guarded by mLock!
|
|
int DataChannelConnection::SendMsgInternalOrBuffer(
|
|
nsTArray<UniquePtr<BufferedOutgoingMsg>>& buffer, OutgoingMsg& msg,
|
|
bool& buffered, size_t* aWritten) {
|
|
NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
|
|
|
|
int error = 0;
|
|
bool need_buffering = false;
|
|
|
|
// Note: Main-thread IO, but doesn't block!
|
|
// XXX FIX! to deal with heavy overruns of JS trying to pass data in
|
|
// (more than the buffersize) queue data onto another thread to do the
|
|
// actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
|
|
|
|
// Avoid a race between buffer-full-failure (where we have to add the
|
|
// packet to the buffered-data queue) and the buffer-now-only-half-full
|
|
// callback, which happens on a different thread. Otherwise we might
|
|
// fail here, then before we add it to the queue get the half-full
|
|
// callback, find nothing to do, then on this thread add it to the
|
|
// queue - which would sit there. Also, if we later send more data, it
|
|
// would arrive ahead of the buffered message, but if the buffer ever
|
|
// got to 1/2 full, the message would get sent - but at a semi-random
|
|
// time, after other data it was supposed to be in front of.
|
|
|
|
// Must lock before empty check for similar reasons!
|
|
mLock.AssertCurrentThreadOwns();
|
|
if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
|
|
error = SendMsgInternal(msg, aWritten);
|
|
switch (error) {
|
|
case 0:
|
|
break;
|
|
case EAGAIN:
|
|
#if (EAGAIN != EWOULDBLOCK)
|
|
case EWOULDBLOCK:
|
|
#endif
|
|
need_buffering = true;
|
|
break;
|
|
default:
|
|
DC_ERROR(("error %d on sending", error));
|
|
break;
|
|
}
|
|
} else {
|
|
need_buffering = true;
|
|
}
|
|
|
|
if (need_buffering) {
|
|
// queue data for resend! And queue any further data for the stream until
|
|
// it is...
|
|
auto* bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
|
|
buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
|
|
DC_DEBUG(("Queued %zu buffers (left=%zu, total=%zu)", buffer.Length(),
|
|
msg.GetLeft(), msg.GetLength()));
|
|
buffered = true;
|
|
return 0;
|
|
}
|
|
|
|
buffered = false;
|
|
return error;
|
|
}
|
|
|
|
// Caller must ensure that length <= UINT32_MAX
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel& channel,
|
|
const uint8_t* data,
|
|
size_t len,
|
|
uint32_t ppid) {
|
|
if (NS_WARN_IF(channel.GetReadyState() != OPEN)) {
|
|
return EINVAL; // TODO: Find a better error code
|
|
}
|
|
|
|
struct sctp_sendv_spa info = {0};
|
|
|
|
// General flags
|
|
info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
|
|
|
|
// Set stream identifier, protocol identifier and flags
|
|
info.sendv_sndinfo.snd_sid = channel.mStream;
|
|
info.sendv_sndinfo.snd_flags = SCTP_EOR;
|
|
info.sendv_sndinfo.snd_ppid = htonl(ppid);
|
|
|
|
// Unordered?
|
|
// To avoid problems where an in-order OPEN is lost and an
|
|
// out-of-order data message "beats" it, require data to be in-order
|
|
// until we get an ACK.
|
|
if (!channel.mOrdered && !(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
|
|
info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
|
|
}
|
|
|
|
// Partial reliability policy
|
|
if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
|
|
info.sendv_prinfo.pr_policy = channel.mPrPolicy;
|
|
info.sendv_prinfo.pr_value = channel.mPrValue;
|
|
info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
|
|
}
|
|
|
|
// Create message instance and send
|
|
OutgoingMsg msg(info, data, len);
|
|
bool buffered;
|
|
size_t written = 0;
|
|
mDeferSend = true;
|
|
int error =
|
|
SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered, &written);
|
|
mDeferSend = false;
|
|
if (written) {
|
|
channel.DecrementBufferedAmount(written);
|
|
}
|
|
|
|
for (auto&& packet : mDeferredSend) {
|
|
MOZ_ASSERT(written);
|
|
SendPacket(std::move(packet));
|
|
}
|
|
mDeferredSend.clear();
|
|
|
|
// Set pending type and stream index (if buffered)
|
|
if (!error && buffered && !mPendingType) {
|
|
mPendingType = PENDING_DATA;
|
|
mCurrentStream = channel.mStream;
|
|
}
|
|
return error;
|
|
}
|
|
|
|
// Caller must ensure that length <= UINT32_MAX
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendDataMsg(DataChannel& channel,
|
|
const uint8_t* data, size_t len,
|
|
uint32_t ppidPartial,
|
|
uint32_t ppidFinal) {
|
|
// We *really* don't want to do this from main thread! - and
|
|
// SendDataMsgInternalOrBuffer avoids blocking.
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
|
|
DC_ERROR(("Message rejected, too large (%zu > %" PRIu64 ")", len,
|
|
mMaxMessageSize));
|
|
return EMSGSIZE;
|
|
}
|
|
|
|
// This will use EOR-based fragmentation if the message is too large (> 64
|
|
// KiB)
|
|
return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
|
|
}
|
|
|
|
class ReadBlobRunnable : public Runnable {
|
|
public:
|
|
ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
|
|
nsIInputStream* aBlob)
|
|
: Runnable("ReadBlobRunnable"),
|
|
mConnection(aConnection),
|
|
mStream(aStream),
|
|
mBlob(aBlob) {}
|
|
|
|
NS_IMETHOD Run() override {
|
|
// ReadBlob() is responsible to releasing the reference
|
|
DataChannelConnection* self = mConnection;
|
|
self->ReadBlob(mConnection.forget(), mStream, mBlob);
|
|
return NS_OK;
|
|
}
|
|
|
|
private:
|
|
// Make sure the Connection doesn't die while there are jobs outstanding.
|
|
// Let it die (if released by PeerConnectionImpl while we're running)
|
|
// when we send our runnable back to MainThread. Then ~DataChannelConnection
|
|
// can send the IOThread to MainThread to die in a runnable, avoiding
|
|
// unsafe event loop recursion. Evil.
|
|
RefPtr<DataChannelConnection> mConnection;
|
|
uint16_t mStream;
|
|
// Use RefCount for preventing the object is deleted when SendBlob returns.
|
|
RefPtr<nsIInputStream> mBlob;
|
|
};
|
|
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream* aBlob) {
|
|
MutexAutoLock lock(mLock);
|
|
RefPtr<DataChannel> channel = mChannels.Get(stream);
|
|
if (NS_WARN_IF(!channel)) {
|
|
return EINVAL; // TODO: Find a better error code
|
|
}
|
|
|
|
// Spawn a thread to send the data
|
|
if (!mInternalIOThread) {
|
|
nsresult rv =
|
|
NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
|
|
if (NS_FAILED(rv)) {
|
|
return EINVAL; // TODO: Find a better error code
|
|
}
|
|
}
|
|
|
|
mInternalIOThread->Dispatch(
|
|
do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
|
|
return 0;
|
|
}
|
|
|
|
class DataChannelBlobSendRunnable : public Runnable {
|
|
public:
|
|
DataChannelBlobSendRunnable(
|
|
already_AddRefed<DataChannelConnection>& aConnection, uint16_t aStream)
|
|
: Runnable("DataChannelBlobSendRunnable"),
|
|
mConnection(aConnection),
|
|
mStream(aStream) {}
|
|
|
|
~DataChannelBlobSendRunnable() override {
|
|
if (!NS_IsMainThread() && mConnection) {
|
|
MOZ_ASSERT(false);
|
|
// explicitly leak the connection if destroyed off mainthread
|
|
Unused << mConnection.forget().take();
|
|
}
|
|
}
|
|
|
|
NS_IMETHOD Run() override {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
|
|
mConnection->SendBinaryMsg(mStream, mData);
|
|
mConnection = nullptr;
|
|
return NS_OK;
|
|
}
|
|
|
|
// explicitly public so we can avoid allocating twice and copying
|
|
nsCString mData;
|
|
|
|
private:
|
|
// Note: we can be destroyed off the target thread, so be careful not to let
|
|
// this get Released()ed on the temp thread!
|
|
RefPtr<DataChannelConnection> mConnection;
|
|
uint16_t mStream;
|
|
};
|
|
|
|
static auto readyStateToCStr(const uint16_t state) -> const char* {
|
|
switch (state) {
|
|
case DataChannelConnection::CONNECTING:
|
|
return "CONNECTING";
|
|
case DataChannelConnection::OPEN:
|
|
return "OPEN";
|
|
case DataChannelConnection::CLOSING:
|
|
return "CLOSING";
|
|
case DataChannelConnection::CLOSED:
|
|
return "CLOSED";
|
|
default: {
|
|
MOZ_ASSERT(false);
|
|
return "UNKNOWW";
|
|
}
|
|
}
|
|
};
|
|
|
|
void DataChannelConnection::SetReadyState(const uint16_t aState) {
|
|
mLock.AssertCurrentThreadOwns();
|
|
|
|
DC_DEBUG(
|
|
("DataChannelConnection labeled %s (%p) switching connection state %s -> "
|
|
"%s",
|
|
mTransportId.c_str(), this, readyStateToCStr(mState),
|
|
readyStateToCStr(aState)));
|
|
|
|
mState = aState;
|
|
}
|
|
|
|
void DataChannelConnection::ReadBlob(
|
|
already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
|
|
nsIInputStream* aBlob) {
|
|
// NOTE: 'aThis' has been forgotten by the caller to avoid releasing
|
|
// it off mainthread; if PeerConnectionImpl has released then we want
|
|
// ~DataChannelConnection() to run on MainThread
|
|
|
|
// XXX to do this safely, we must enqueue these atomically onto the
|
|
// output socket. We need a sender thread(s?) to enqueue data into the
|
|
// socket and to avoid main-thread IO that might block. Even on a
|
|
// background thread, we may not want to block on one stream's data.
|
|
// I.e. run non-blocking and service multiple channels.
|
|
|
|
// Must not let Dispatching it cause the DataChannelConnection to get
|
|
// released on the wrong thread. Using
|
|
// WrapRunnable(RefPtr<DataChannelConnection>(aThis),... will occasionally
|
|
// cause aThis to get released on this thread. Also, an explicit Runnable
|
|
// lets us avoid copying the blob data an extra time.
|
|
RefPtr<DataChannelBlobSendRunnable> runnable =
|
|
new DataChannelBlobSendRunnable(aThis, aStream);
|
|
// avoid copying the blob data by passing the mData from the runnable
|
|
if (NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, -1))) {
|
|
// Bug 966602: Doesn't return an error to the caller via onerror.
|
|
// We must release DataChannelConnection on MainThread to avoid issues (bug
|
|
// 876167) aThis is now owned by the runnable; release it there
|
|
NS_ReleaseOnMainThread("DataChannelBlobSendRunnable", runnable.forget());
|
|
return;
|
|
}
|
|
aBlob->Close();
|
|
Dispatch(runnable.forget());
|
|
}
|
|
|
|
// Returns a POSIX error code.
|
|
int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
|
|
const nsACString& aMsg,
|
|
bool isBinary) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
// We really could allow this from other threads, so long as we deal with
|
|
// asynchronosity issues with channels closing, in particular access to
|
|
// mChannels, and issues with the association closing (access to mSocket).
|
|
|
|
const uint8_t* data = (const uint8_t*)aMsg.BeginReading();
|
|
uint32_t len = aMsg.Length();
|
|
#if (UINT32_MAX > SIZE_MAX)
|
|
if (len > SIZE_MAX) {
|
|
return EMSGSIZE;
|
|
}
|
|
#endif
|
|
|
|
DC_DEBUG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "",
|
|
stream, len));
|
|
// XXX if we want more efficiency, translate flags once at open time
|
|
RefPtr<DataChannel> channelPtr = mChannels.Get(stream);
|
|
if (NS_WARN_IF(!channelPtr)) {
|
|
return EINVAL; // TODO: Find a better error code
|
|
}
|
|
|
|
auto& channel = *channelPtr;
|
|
int err = 0;
|
|
MutexAutoLock lock(mLock);
|
|
if (isBinary) {
|
|
err = SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_BINARY_PARTIAL,
|
|
DATA_CHANNEL_PPID_BINARY);
|
|
} else {
|
|
err = SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING_PARTIAL,
|
|
DATA_CHANNEL_PPID_DOMSTRING);
|
|
}
|
|
if (!err) {
|
|
channel.WithTrafficCounters([&len](DataChannel::TrafficCounters& counters) {
|
|
counters.mMessagesSent++;
|
|
counters.mBytesSent += len;
|
|
});
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
void DataChannelConnection::Stop() {
|
|
// Note: This will call 'CloseAll' from the main thread
|
|
Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
|
|
DataChannelOnMessageAvailable::ON_DISCONNECTED, this)));
|
|
}
|
|
|
|
void DataChannelConnection::Close(DataChannel* aChannel) {
|
|
MutexAutoLock lock(mLock);
|
|
CloseLocked(aChannel);
|
|
}
|
|
|
|
// So we can call Close() with the lock already held
|
|
// Called from someone who holds a ref via ::Close(), or from ~DataChannel
|
|
void DataChannelConnection::CloseLocked(DataChannel* aChannel) {
|
|
MOZ_ASSERT(aChannel);
|
|
RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
|
|
|
|
mLock.AssertCurrentThreadOwns();
|
|
DC_DEBUG(("Connection %p/Channel %p: Closing stream %u",
|
|
channel->mConnection.get(), channel.get(), channel->mStream));
|
|
|
|
aChannel->mBufferedData.Clear();
|
|
if (GetReadyState() == CLOSED) {
|
|
// If we're CLOSING, we might leave this in place until we can send a
|
|
// reset.
|
|
mChannels.Remove(channel);
|
|
}
|
|
|
|
// This is supposed to only be accessed from Main thread, but this has
|
|
// been accessed here from the STS thread for a long time now.
|
|
// See Bug 1586475
|
|
auto channelState = aChannel->mReadyState;
|
|
// re-test since it may have closed before the lock was grabbed
|
|
if (channelState == CLOSED || channelState == CLOSING) {
|
|
DC_DEBUG(("Channel already closing/closed (%u)", channelState));
|
|
return;
|
|
}
|
|
|
|
if (channel->mStream != INVALID_STREAM) {
|
|
ResetOutgoingStream(channel->mStream);
|
|
if (GetReadyState() != CLOSED) {
|
|
// Individual channel is being closed, send reset now.
|
|
SendOutgoingStreamReset();
|
|
}
|
|
}
|
|
aChannel->SetReadyState(CLOSING);
|
|
if (GetReadyState() == CLOSED) {
|
|
// we're not going to hang around waiting
|
|
channel->StreamClosedLocked();
|
|
}
|
|
// At this point when we leave here, the object is a zombie held alive only by
|
|
// the DOM object
|
|
}
|
|
|
|
void DataChannelConnection::CloseAll() {
|
|
DC_DEBUG(("Closing all channels (connection %p)", (void*)this));
|
|
|
|
// Make sure no more channels will be opened
|
|
MutexAutoLock lock(mLock);
|
|
SetReadyState(CLOSED);
|
|
|
|
// Close current channels
|
|
// If there are runnables, they hold a strong ref and keep the channel
|
|
// and/or connection alive (even if in a CLOSED state)
|
|
for (auto& channel : mChannels.GetAll()) {
|
|
MutexAutoUnlock lock(mLock);
|
|
channel->Close();
|
|
}
|
|
|
|
// Clean up any pending opens for channels
|
|
RefPtr<DataChannel> channel;
|
|
while (nullptr != (channel = mPending.PopFront())) {
|
|
DC_DEBUG(("closing pending channel %p, stream %u", channel.get(),
|
|
channel->mStream));
|
|
MutexAutoUnlock lock(mLock);
|
|
channel->Close(); // also releases the ref on each iteration
|
|
}
|
|
// It's more efficient to let the Resets queue in shutdown and then
|
|
// SendOutgoingStreamReset() here.
|
|
SendOutgoingStreamReset();
|
|
}
|
|
|
|
bool DataChannelConnection::Channels::IdComparator::Equals(
|
|
const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
|
|
return aChannel->mStream == aId;
|
|
}
|
|
|
|
bool DataChannelConnection::Channels::IdComparator::LessThan(
|
|
const RefPtr<DataChannel>& aChannel, uint16_t aId) const {
|
|
return aChannel->mStream < aId;
|
|
}
|
|
|
|
bool DataChannelConnection::Channels::IdComparator::Equals(
|
|
const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
|
|
return Equals(a1, a2->mStream);
|
|
}
|
|
|
|
bool DataChannelConnection::Channels::IdComparator::LessThan(
|
|
const RefPtr<DataChannel>& a1, const RefPtr<DataChannel>& a2) const {
|
|
return LessThan(a1, a2->mStream);
|
|
}
|
|
|
|
void DataChannelConnection::Channels::Insert(
|
|
const RefPtr<DataChannel>& aChannel) {
|
|
DC_DEBUG(("Inserting channel %u : %p", aChannel->mStream, aChannel.get()));
|
|
MutexAutoLock lock(mMutex);
|
|
if (aChannel->mStream != INVALID_STREAM) {
|
|
MOZ_ASSERT(!mChannels.ContainsSorted(aChannel, IdComparator()));
|
|
}
|
|
|
|
MOZ_ASSERT(!mChannels.Contains(aChannel));
|
|
|
|
mChannels.InsertElementSorted(aChannel, IdComparator());
|
|
}
|
|
|
|
bool DataChannelConnection::Channels::Remove(
|
|
const RefPtr<DataChannel>& aChannel) {
|
|
DC_DEBUG(("Removing channel %u : %p", aChannel->mStream, aChannel.get()));
|
|
MutexAutoLock lock(mMutex);
|
|
if (aChannel->mStream == INVALID_STREAM) {
|
|
return mChannels.RemoveElement(aChannel);
|
|
}
|
|
|
|
return mChannels.RemoveElementSorted(aChannel, IdComparator());
|
|
}
|
|
|
|
RefPtr<DataChannel> DataChannelConnection::Channels::Get(uint16_t aId) const {
|
|
MutexAutoLock lock(mMutex);
|
|
auto index = mChannels.BinaryIndexOf(aId, IdComparator());
|
|
if (index == ChannelArray::NoIndex) {
|
|
return nullptr;
|
|
}
|
|
return mChannels[index];
|
|
}
|
|
|
|
RefPtr<DataChannel> DataChannelConnection::Channels::GetNextChannel(
|
|
uint16_t aCurrentId) const {
|
|
MutexAutoLock lock(mMutex);
|
|
if (mChannels.IsEmpty()) {
|
|
return nullptr;
|
|
}
|
|
|
|
auto index = mChannels.IndexOfFirstElementGt(aCurrentId, IdComparator());
|
|
if (index == mChannels.Length()) {
|
|
index = 0;
|
|
}
|
|
return mChannels[index];
|
|
}
|
|
|
|
DataChannel::~DataChannel() {
|
|
// NS_ASSERTION since this is more "I think I caught all the cases that
|
|
// can cause this" than a true kill-the-program assertion. If this is
|
|
// wrong, nothing bad happens. A worst it's a leak.
|
|
NS_ASSERTION(mReadyState == CLOSED || mReadyState == CLOSING,
|
|
"unexpected state in ~DataChannel");
|
|
}
|
|
|
|
void DataChannel::Close() {
|
|
if (mConnection) {
|
|
// ensure we don't get deleted
|
|
RefPtr<DataChannelConnection> connection(mConnection);
|
|
connection->Close(this);
|
|
}
|
|
}
|
|
|
|
// Used when disconnecting from the DataChannelConnection
|
|
void DataChannel::StreamClosedLocked() {
|
|
mConnection->mLock.AssertCurrentThreadOwns();
|
|
ENSURE_DATACONNECTION;
|
|
|
|
DC_DEBUG(("Destroying Data channel %u", mStream));
|
|
MOZ_ASSERT_IF(mStream != INVALID_STREAM,
|
|
!mConnection->FindChannelByStream(mStream));
|
|
AnnounceClosed();
|
|
// We leave mConnection live until the DOM releases us, to avoid races
|
|
}
|
|
|
|
void DataChannel::ReleaseConnection() {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
mConnection = nullptr;
|
|
}
|
|
|
|
void DataChannel::SetListener(DataChannelListener* aListener,
|
|
nsISupports* aContext) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
mContext = aContext;
|
|
mListener = aListener;
|
|
}
|
|
|
|
void DataChannel::SendErrnoToErrorResult(int error, size_t aMessageSize,
|
|
ErrorResult& aRv) {
|
|
switch (error) {
|
|
case 0:
|
|
break;
|
|
case EMSGSIZE: {
|
|
nsPrintfCString err("Message size (%zu) exceeds maxMessageSize",
|
|
aMessageSize);
|
|
aRv.ThrowTypeError(err);
|
|
break;
|
|
}
|
|
default:
|
|
aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void DataChannel::IncrementBufferedAmount(uint32_t aSize, ErrorResult& aRv) {
|
|
ASSERT_WEBRTC(NS_IsMainThread());
|
|
if (mBufferedAmount > UINT32_MAX - aSize) {
|
|
aRv.Throw(NS_ERROR_FILE_TOO_BIG);
|
|
return;
|
|
}
|
|
|
|
mBufferedAmount += aSize;
|
|
}
|
|
|
|
void DataChannel::DecrementBufferedAmount(uint32_t aSize) {
|
|
mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction(
|
|
"DataChannel::DecrementBufferedAmount",
|
|
[this, self = RefPtr<DataChannel>(this), aSize] {
|
|
MOZ_ASSERT(aSize <= mBufferedAmount);
|
|
bool wasLow = mBufferedAmount <= mBufferedThreshold;
|
|
mBufferedAmount -= aSize;
|
|
if (!wasLow && mBufferedAmount <= mBufferedThreshold) {
|
|
DC_DEBUG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u",
|
|
__FUNCTION__, mLabel.get(), mProtocol.get(), mStream));
|
|
mListener->OnBufferLow(mContext);
|
|
}
|
|
if (mBufferedAmount == 0) {
|
|
DC_DEBUG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u",
|
|
__FUNCTION__, mLabel.get(), mProtocol.get(), mStream));
|
|
mListener->NotBuffered(mContext);
|
|
}
|
|
}));
|
|
}
|
|
|
|
void DataChannel::AnnounceOpen() {
|
|
mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction(
|
|
"DataChannel::AnnounceOpen", [this, self = RefPtr<DataChannel>(this)] {
|
|
auto state = GetReadyState();
|
|
// Special-case; spec says to put brand-new remote-created DataChannel
|
|
// in "open", but queue the firing of the "open" event.
|
|
if (state != CLOSING && state != CLOSED && mListener) {
|
|
SetReadyState(OPEN);
|
|
DC_DEBUG(("%s: sending ON_CHANNEL_OPEN for %s/%s: %u", __FUNCTION__,
|
|
mLabel.get(), mProtocol.get(), mStream));
|
|
mListener->OnChannelConnected(mContext);
|
|
}
|
|
}));
|
|
}
|
|
|
|
void DataChannel::AnnounceClosed() {
|
|
mMainThreadEventTarget->Dispatch(NS_NewRunnableFunction(
|
|
"DataChannel::AnnounceClosed", [this, self = RefPtr<DataChannel>(this)] {
|
|
if (GetReadyState() == CLOSED) {
|
|
return;
|
|
}
|
|
SetReadyState(CLOSED);
|
|
mBufferedData.Clear();
|
|
if (mListener) {
|
|
DC_DEBUG(("%s: sending ON_CHANNEL_CLOSED for %s/%s: %u", __FUNCTION__,
|
|
mLabel.get(), mProtocol.get(), mStream));
|
|
mListener->OnChannelClosed(mContext);
|
|
}
|
|
}));
|
|
}
|
|
|
|
// Set ready state
|
|
void DataChannel::SetReadyState(const uint16_t aState) {
|
|
MOZ_ASSERT(NS_IsMainThread());
|
|
|
|
DC_DEBUG(
|
|
("DataChannelConnection labeled %s(%p) (stream %d) changing ready state "
|
|
"%s -> %s",
|
|
mLabel.get(), this, mStream, readyStateToCStr(mReadyState),
|
|
readyStateToCStr(aState)));
|
|
|
|
mReadyState = aState;
|
|
}
|
|
|
|
void DataChannel::SendMsg(const nsACString& aMsg, ErrorResult& aRv) {
|
|
if (!EnsureValidStream(aRv)) {
|
|
return;
|
|
}
|
|
|
|
SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aMsg.Length(),
|
|
aRv);
|
|
if (!aRv.Failed()) {
|
|
IncrementBufferedAmount(aMsg.Length(), aRv);
|
|
}
|
|
}
|
|
|
|
void DataChannel::SendBinaryMsg(const nsACString& aMsg, ErrorResult& aRv) {
|
|
if (!EnsureValidStream(aRv)) {
|
|
return;
|
|
}
|
|
|
|
SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg),
|
|
aMsg.Length(), aRv);
|
|
if (!aRv.Failed()) {
|
|
IncrementBufferedAmount(aMsg.Length(), aRv);
|
|
}
|
|
}
|
|
|
|
void DataChannel::SendBinaryBlob(dom::Blob& aBlob, ErrorResult& aRv) {
|
|
if (!EnsureValidStream(aRv)) {
|
|
return;
|
|
}
|
|
|
|
uint64_t msgLength = aBlob.GetSize(aRv);
|
|
if (aRv.Failed()) {
|
|
return;
|
|
}
|
|
|
|
if (msgLength > UINT32_MAX) {
|
|
aRv.Throw(NS_ERROR_FILE_TOO_BIG);
|
|
return;
|
|
}
|
|
|
|
// We convert to an nsIInputStream here, because Blob is not threadsafe, and
|
|
// we don't convert it earlier because we need to know how large this is so we
|
|
// can update bufferedAmount.
|
|
nsCOMPtr<nsIInputStream> msgStream;
|
|
aBlob.CreateInputStream(getter_AddRefs(msgStream), aRv);
|
|
if (NS_WARN_IF(aRv.Failed())) {
|
|
return;
|
|
}
|
|
|
|
SendErrnoToErrorResult(mConnection->SendBlob(mStream, msgStream), msgLength,
|
|
aRv);
|
|
if (!aRv.Failed()) {
|
|
IncrementBufferedAmount(msgLength, aRv);
|
|
}
|
|
}
|
|
|
|
dom::Nullable<uint16_t> DataChannel::GetMaxPacketLifeTime() const {
|
|
if (mPrPolicy == SCTP_PR_SCTP_TTL) {
|
|
return dom::Nullable<uint16_t>(mPrValue);
|
|
}
|
|
return dom::Nullable<uint16_t>();
|
|
}
|
|
|
|
dom::Nullable<uint16_t> DataChannel::GetMaxRetransmits() const {
|
|
if (mPrPolicy == SCTP_PR_SCTP_RTX) {
|
|
return dom::Nullable<uint16_t>(mPrValue);
|
|
}
|
|
return dom::Nullable<uint16_t>();
|
|
}
|
|
|
|
uint32_t DataChannel::GetBufferedAmountLowThreshold() const {
|
|
return mBufferedThreshold;
|
|
}
|
|
|
|
// Never fire immediately, as it's defined to fire on transitions, not state
|
|
void DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold) {
|
|
mBufferedThreshold = aThreshold;
|
|
}
|
|
|
|
// Called with mLock locked!
|
|
void DataChannel::SendOrQueue(DataChannelOnMessageAvailable* aMessage) {
|
|
nsCOMPtr<nsIRunnable> runnable = aMessage;
|
|
mMainThreadEventTarget->Dispatch(runnable.forget());
|
|
}
|
|
|
|
DataChannel::TrafficCounters DataChannel::GetTrafficCounters() const {
|
|
MutexAutoLock lock(mStatsLock);
|
|
return mTrafficCounters;
|
|
}
|
|
|
|
bool DataChannel::EnsureValidStream(ErrorResult& aRv) {
|
|
MOZ_ASSERT(mConnection);
|
|
if (mConnection && mStream != INVALID_STREAM) {
|
|
return true;
|
|
}
|
|
aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
|
|
return false;
|
|
}
|
|
|
|
void DataChannel::WithTrafficCounters(
|
|
const std::function<void(TrafficCounters&)>& aFn) {
|
|
MutexAutoLock lock(mStatsLock);
|
|
aFn(mTrafficCounters);
|
|
}
|
|
|
|
} // namespace mozilla
|