Merge mozilla-central to inbound. a=merge CLOSED TREE

This commit is contained in:
Noemi Erli 2018-06-11 00:40:00 +03:00
Родитель 2eecfcbe24 777bf8b996
Коммит ad2118e866
44 изменённых файлов: 1067 добавлений и 1051 удалений

Просмотреть файл

@ -13,16 +13,11 @@
#include "mozilla/RefPtr.h"
static const char* sfLogTag = "SrtpFlow";
#ifdef LOGTAG
#undef LOGTAG
#endif
#define LOGTAG sfLogTag
using namespace mozilla;
namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
bool SrtpFlow::initialized; // Static
SrtpFlow::~SrtpFlow() {
@ -42,12 +37,12 @@ RefPtr<SrtpFlow> SrtpFlow::Create(int cipher_suite,
RefPtr<SrtpFlow> flow = new SrtpFlow();
if (!key) {
CSFLogError(LOGTAG, "Null SRTP key specified");
MOZ_MTLOG(ML_ERROR, "Null SRTP key specified");
return nullptr;
}
if (key_len != SRTP_TOTAL_KEY_LENGTH) {
CSFLogError(LOGTAG, "Invalid SRTP key length");
MOZ_MTLOG(ML_ERROR, "Invalid SRTP key length");
return nullptr;
}
@ -58,19 +53,19 @@ RefPtr<SrtpFlow> SrtpFlow::Create(int cipher_suite,
// since any flow can only have one cipher suite with DTLS-SRTP
switch (cipher_suite) {
case SRTP_AES128_CM_HMAC_SHA1_80:
CSFLogDebug(LOGTAG,
MOZ_MTLOG(ML_DEBUG,
"Setting SRTP cipher suite SRTP_AES128_CM_HMAC_SHA1_80");
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp);
break;
case SRTP_AES128_CM_HMAC_SHA1_32:
CSFLogDebug(LOGTAG,
MOZ_MTLOG(ML_DEBUG,
"Setting SRTP cipher suite SRTP_AES128_CM_HMAC_SHA1_32");
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_32(&policy.rtp);
srtp_crypto_policy_set_aes_cm_128_hmac_sha1_80(&policy.rtcp); // 80-bit per RFC 5764
break; // S 4.1.2.
default:
CSFLogError(LOGTAG, "Request to set unknown SRTP cipher suite");
MOZ_MTLOG(ML_ERROR, "Request to set unknown SRTP cipher suite");
return nullptr;
}
// This key is copied into the srtp_t object, so we don't
@ -87,7 +82,7 @@ RefPtr<SrtpFlow> SrtpFlow::Create(int cipher_suite,
// Now make the session
srtp_err_status_t r = srtp_create(&flow->session_, &policy);
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Error creating srtp session");
MOZ_MTLOG(ML_ERROR, "Error creating srtp session");
return nullptr;
}
@ -99,30 +94,30 @@ nsresult SrtpFlow::CheckInputs(bool protect, void *in, int in_len,
int max_len, int *out_len) {
MOZ_ASSERT(in);
if (!in) {
CSFLogError(LOGTAG, "NULL input value");
MOZ_MTLOG(ML_ERROR, "NULL input value");
return NS_ERROR_NULL_POINTER;
}
if (in_len < 0) {
CSFLogError(LOGTAG, "Input length is negative");
MOZ_MTLOG(ML_ERROR, "Input length is negative");
return NS_ERROR_ILLEGAL_VALUE;
}
if (max_len < 0) {
CSFLogError(LOGTAG, "Max output length is negative");
MOZ_MTLOG(ML_ERROR, "Max output length is negative");
return NS_ERROR_ILLEGAL_VALUE;
}
if (protect) {
if ((max_len < SRTP_MAX_EXPANSION) ||
((max_len - SRTP_MAX_EXPANSION) < in_len)) {
CSFLogError(LOGTAG, "Output too short");
MOZ_MTLOG(ML_ERROR, "Output too short");
return NS_ERROR_ILLEGAL_VALUE;
}
}
else {
if (in_len > max_len) {
CSFLogError(LOGTAG, "Output too short");
MOZ_MTLOG(ML_ERROR, "Output too short");
return NS_ERROR_ILLEGAL_VALUE;
}
}
@ -140,7 +135,7 @@ nsresult SrtpFlow::ProtectRtp(void *in, int in_len,
srtp_err_status_t r = srtp_protect(session_, in, &len);
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Error protecting SRTP packet");
MOZ_MTLOG(ML_ERROR, "Error protecting SRTP packet");
return NS_ERROR_FAILURE;
}
@ -148,8 +143,8 @@ nsresult SrtpFlow::ProtectRtp(void *in, int in_len,
*out_len = len;
CSFLogDebug(LOGTAG, "Successfully protected an SRTP packet of len %d",
*out_len);
MOZ_MTLOG(ML_DEBUG, "Successfully protected an SRTP packet of len "
<< *out_len);
return NS_OK;
}
@ -164,15 +159,15 @@ nsresult SrtpFlow::UnprotectRtp(void *in, int in_len,
srtp_err_status_t r = srtp_unprotect(session_, in, &len);
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Error unprotecting SRTP packet error=%d", (int)r);
MOZ_MTLOG(ML_ERROR, "Error unprotecting SRTP packet error=" << (int)r);
return NS_ERROR_FAILURE;
}
MOZ_ASSERT(len <= max_len);
*out_len = len;
CSFLogDebug(LOGTAG, "Successfully unprotected an SRTP packet of len %d",
*out_len);
MOZ_MTLOG(ML_DEBUG, "Successfully unprotected an SRTP packet of len "
<< *out_len);
return NS_OK;
}
@ -187,15 +182,15 @@ nsresult SrtpFlow::ProtectRtcp(void *in, int in_len,
srtp_err_status_t r = srtp_protect_rtcp(session_, in, &len);
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Error protecting SRTCP packet");
MOZ_MTLOG(ML_ERROR, "Error protecting SRTCP packet");
return NS_ERROR_FAILURE;
}
MOZ_ASSERT(len <= max_len);
*out_len = len;
CSFLogDebug(LOGTAG, "Successfully protected an SRTCP packet of len %d",
*out_len);
MOZ_MTLOG(ML_DEBUG, "Successfully protected an SRTCP packet of len "
<< *out_len);
return NS_OK;
}
@ -210,15 +205,15 @@ nsresult SrtpFlow::UnprotectRtcp(void *in, int in_len,
srtp_err_status_t r = srtp_unprotect_rtcp(session_, in, &len);
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Error unprotecting SRTCP packet error=%d", (int)r);
MOZ_MTLOG(ML_ERROR, "Error unprotecting SRTCP packet error=" << (int)r);
return NS_ERROR_FAILURE;
}
MOZ_ASSERT(len <= max_len);
*out_len = len;
CSFLogDebug(LOGTAG, "Successfully unprotected an SRTCP packet of len %d",
*out_len);
MOZ_MTLOG(ML_DEBUG, "Successfully unprotected an SRTCP packet of len "
<< *out_len);
return NS_OK;
}
@ -233,14 +228,14 @@ nsresult SrtpFlow::Init() {
if (!initialized) {
srtp_err_status_t r = srtp_init();
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Could not initialize SRTP");
MOZ_MTLOG(ML_ERROR, "Could not initialize SRTP");
MOZ_ASSERT(PR_FALSE);
return NS_ERROR_FAILURE;
}
r = srtp_install_event_handler(&SrtpFlow::srtp_event_handler);
if (r != srtp_err_status_ok) {
CSFLogError(LOGTAG, "Could not install SRTP event handler");
MOZ_MTLOG(ML_ERROR, "Could not install SRTP event handler");
MOZ_ASSERT(PR_FALSE);
return NS_ERROR_FAILURE;
}

Просмотреть файл

@ -9,6 +9,7 @@ include("/ipc/chromium/chromium-config.mozbuild")
EXPORTS.mtransport += [
'../dtlsidentity.h',
'../m_cpp_utils.h',
'../mediapacket.h',
'../nricectx.h',
'../nricemediastream.h',
'../nriceresolverfake.h',
@ -17,6 +18,7 @@ EXPORTS.mtransport += [
'../runnable_utils.h',
'../sigslot.h',
'../simpletokenbucket.h',
'../SrtpFlow.h',
'../stun_socket_filter.h',
'../transportflow.h',
'../transportlayer.h',
@ -24,6 +26,7 @@ EXPORTS.mtransport += [
'../transportlayerice.h',
'../transportlayerlog.h',
'../transportlayerloopback.h',
'../transportlayersrtp.h',
]
include('../common.build')

Просмотреть файл

@ -6,6 +6,7 @@
mtransport_lcppsrcs = [
'dtlsidentity.cpp',
'mediapacket.cpp',
'nr_socket_prsock.cpp',
'nr_timer.cpp',
'nricectx.cpp',
@ -17,6 +18,7 @@ mtransport_lcppsrcs = [
'nrinterfaceprioritizer.cpp',
'rlogconnector.cpp',
'simpletokenbucket.cpp',
'SrtpFlow.cpp',
'stun_socket_filter.cpp',
'test_nr_socket.cpp',
'transportflow.cpp',
@ -25,6 +27,7 @@ mtransport_lcppsrcs = [
'transportlayerice.cpp',
'transportlayerlog.cpp',
'transportlayerloopback.cpp',
'transportlayersrtp.cpp',
]
mtransport_cppsrcs = [
@ -47,6 +50,8 @@ LOCAL_INCLUDES += [
'/media/mtransport/third_party/nrappkit/src/share',
'/media/mtransport/third_party/nrappkit/src/stats',
'/media/mtransport/third_party/nrappkit/src/util/libekr',
'/netwerk/srtp/src/crypto/include',
'/netwerk/srtp/src/include',
]
if CONFIG['OS_TARGET'] in ['Darwin', 'DragonFly', 'FreeBSD', 'NetBSD', 'OpenBSD']:

Просмотреть файл

@ -1,74 +0,0 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#ifndef databuffer_h__
#define databuffer_h__
#include <algorithm>
#include <mozilla/UniquePtr.h>
#include <m_cpp_utils.h>
#include <nsISupportsImpl.h>
namespace mozilla {
class DataBuffer {
public:
DataBuffer() : data_(nullptr), len_(0), capacity_(0) {}
DataBuffer(const uint8_t *data, size_t len) {
Assign(data, len, len);
}
DataBuffer(const uint8_t *data, size_t len, size_t capacity) {
Assign(data, len, capacity);
}
// to ensure extra space for expansion
void Assign(const uint8_t *data, size_t len, size_t capacity) {
MOZ_RELEASE_ASSERT(len <= capacity);
Allocate(capacity); // sets len_ = capacity
memcpy(static_cast<void *>(data_.get()),
static_cast<const void *>(data), len);
len_ = len;
}
void Allocate(size_t capacity) {
data_.reset(new uint8_t[capacity ? capacity : 1]); // Don't depend on new [0].
len_ = capacity_ = capacity;
}
void EnsureCapacity(size_t capacity) {
if (capacity_ < capacity) {
uint8_t *new_data = new uint8_t[ capacity ? capacity : 1];
memcpy(static_cast<void *>(new_data),
static_cast<const void *>(data_.get()), len_);
data_.reset(new_data); // after copying! Deletes old data
capacity_ = capacity;
}
}
// used when something writes to the buffer (having checked
// capacity() or used EnsureCapacity()) and increased the length.
void SetLength(size_t len) {
MOZ_RELEASE_ASSERT(len <= capacity_);
len_ = len;
}
const uint8_t *data() const { return data_.get(); }
uint8_t *data() { return data_.get(); }
size_t len() const { return len_; }
size_t capacity() const { return capacity_; }
private:
UniquePtr<uint8_t[]> data_;
size_t len_;
size_t capacity_;
DISALLOW_COPY_ASSIGN(DataBuffer);
};
}
#endif

Просмотреть файл

@ -0,0 +1,26 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "mediapacket.h"
#include <cstring>
namespace mozilla {
void
MediaPacket::Copy(const uint8_t* data, size_t len, size_t capacity)
{
if (capacity < len) {
capacity = len;
}
data_.reset(new uint8_t[capacity]);
len_ = len;
capacity_ = capacity;
memcpy(data_.get(), data, len);
}
} // namespace mozilla

Просмотреть файл

@ -0,0 +1,117 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef mediapacket_h__
#define mediapacket_h__
#include <cstddef>
#include <cstdint>
#include "mozilla/UniquePtr.h"
#include "mozilla/Maybe.h"
namespace mozilla {
// TODO: It might be worthwhile to teach this class how to "borrow" a buffer.
// That would make it easier to misuse, however, so maybe not worth it.
class MediaPacket {
public:
MediaPacket() = default;
MediaPacket(MediaPacket&& orig) = default;
// Takes ownership of the passed-in data
void Take(UniquePtr<uint8_t[]>&& data, size_t len, size_t capacity=0)
{
data_ = std::move(data);
len_ = len;
if (capacity < len) {
capacity = len;
}
capacity_ = capacity;
}
void Reset()
{
data_.reset();
len_ = 0;
capacity_ = 0;
}
// Copies the passed-in data
void Copy(const uint8_t* data, size_t len, size_t capacity=0);
uint8_t* data() const
{
return data_.get();
}
size_t len() const
{
return len_;
}
void SetLength(size_t length)
{
len_ = length;
}
size_t capacity() const
{
return capacity_;
}
Maybe<size_t>& sdp_level()
{
return sdp_level_;
}
void CopyDataToEncrypted()
{
encrypted_data_ = std::move(data_);
encrypted_len_ = len_;
Copy(encrypted_data_.get(), len_);
}
const uint8_t* encrypted_data() const
{
return encrypted_data_.get();
}
size_t encrypted_len() const
{
return encrypted_len_;
}
enum Type {
UNCLASSIFIED,
RTP,
RTCP,
SCTP
};
void SetType(Type type)
{
type_ = type;
}
Type type() const
{
return type_;
}
private:
UniquePtr<uint8_t[]> data_;
size_t len_ = 0;
size_t capacity_ = 0;
// Encrypted form of the data, if there is one.
UniquePtr<uint8_t[]> encrypted_data_;
size_t encrypted_len_ = 0;
// SDP level that this packet belongs to, if known.
Maybe<size_t> sdp_level_;
Type type_ = UNCLASSIFIED;
};
}
#endif // mediapacket_h__

Просмотреть файл

@ -1185,7 +1185,8 @@ NS_IMETHODIMP NrUdpSocketIpc::CallListenerReceivedData(const nsACString &host,
}
}
nsAutoPtr<DataBuffer> buf(new DataBuffer(data, data_length));
nsAutoPtr<MediaPacket> buf(new MediaPacket);
buf->Copy(data, data_length);
RefPtr<nr_udp_message> msg(new nr_udp_message(addr, buf));
RUN_ON_THREAD(sts_thread_,
@ -1376,7 +1377,8 @@ int NrUdpSocketIpc::sendto(const void *msg, size_t len, int flags,
return R_WOULDBLOCK;
}
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t*>(msg), len));
nsAutoPtr<MediaPacket> buf(new MediaPacket);
buf->Copy(static_cast<const uint8_t*>(msg), len);
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(RefPtr<NrUdpSocketIpc>(this),
@ -1594,7 +1596,7 @@ void NrUdpSocketIpc::connect_i(const nsACString &host, const uint16_t port) {
}
void NrUdpSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf) {
void NrUdpSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<MediaPacket> buf) {
ASSERT_ON_THREAD(io_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
@ -1751,7 +1753,8 @@ NS_IMETHODIMP NrTcpSocketIpc::FireDataArrayEvent(const nsAString& aType,
// Called when we received data.
uint8_t *buf = const_cast<uint8_t*>(buffer.Elements());
nsAutoPtr<DataBuffer> data_buf(new DataBuffer(buf, buffer.Length()));
nsAutoPtr<MediaPacket> data_buf(new MediaPacket);
data_buf->Copy(buf, buffer.Length());
RefPtr<nr_tcp_message> msg = new nr_tcp_message(data_buf);
RUN_ON_THREAD(sts_thread_,

Просмотреть файл

@ -62,7 +62,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "nsThreadUtils.h"
#include "nsITCPSocketCallback.h"
#include "databuffer.h"
#include "mediapacket.h"
#include "m_cpp_utils.h"
#include "mozilla/ReentrantMonitor.h"
#include "mozilla/RefPtr.h"
@ -197,14 +197,14 @@ protected:
};
struct nr_udp_message {
nr_udp_message(const PRNetAddr &from, nsAutoPtr<DataBuffer> &data)
nr_udp_message(const PRNetAddr &from, nsAutoPtr<MediaPacket> &data)
: from(from), data(data) {
}
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(nr_udp_message);
PRNetAddr from;
nsAutoPtr<DataBuffer> data;
nsAutoPtr<MediaPacket> data;
private:
~nr_udp_message() {}
@ -277,7 +277,7 @@ private:
// Main or private thread executors of the NrSocketBase APIs
void create_i(const nsACString &host, const uint16_t port);
void connect_i(const nsACString &host, const uint16_t port);
void sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf);
void sendto_i(const net::NetAddr &addr, nsAutoPtr<MediaPacket> buf);
void close_i();
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
static void destroy_i(nsIUDPSocketChild* aChild,
@ -312,7 +312,7 @@ private:
};
struct nr_tcp_message {
explicit nr_tcp_message(nsAutoPtr<DataBuffer> &data)
explicit nr_tcp_message(nsAutoPtr<MediaPacket> &data)
: read_bytes(0)
, data(data) {
}
@ -333,7 +333,7 @@ private:
~nr_tcp_message() {}
DISALLOW_COPY_ASSIGN(nr_tcp_message);
nsAutoPtr<DataBuffer> data;
nsAutoPtr<MediaPacket> data;
};
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)

Просмотреть файл

@ -20,7 +20,6 @@ extern "C" {
#include "stun.h"
}
#include "databuffer.h"
#include "dummysocket.h"
#include "nr_socket_prsock.h"

Просмотреть файл

@ -15,7 +15,7 @@ extern "C" {
#include "transport_addr.h"
}
#include "databuffer.h"
#include "mediapacket.h"
#include "mozilla/UniquePtr.h"
#define GTEST_HAS_RTTI 0
@ -24,14 +24,14 @@ extern "C" {
namespace mozilla {
static UniquePtr<DataBuffer> merge(UniquePtr<DataBuffer> a, UniquePtr<DataBuffer> b) {
static UniquePtr<MediaPacket> merge(UniquePtr<MediaPacket> a, UniquePtr<MediaPacket> b) {
if (a && a->len() && b && b->len()) {
UniquePtr<DataBuffer> merged(new DataBuffer());
merged->Allocate(a->len() + b->len());
memcpy(merged->data(), a->data(), a->len());
memcpy(merged->data() + a->len(), b->data(), b->len());
UniquePtr<uint8_t[]> data(new uint8_t[a->len() + b->len()]);
memcpy(data.get(), a->data(), a->len());
memcpy(data.get() + a->len(), b->data(), b->len());
UniquePtr<MediaPacket> merged(new MediaPacket);
merged->Take(std::move(data), a->len() + b->len());
return merged;
}
@ -100,7 +100,8 @@ class DummySocket : public NrSocketBase {
size_t to_write = std::min(len, writable_);
if (to_write) {
UniquePtr<DataBuffer> msgbuf(new DataBuffer(static_cast<const uint8_t *>(msg), to_write));
UniquePtr<MediaPacket> msgbuf(new MediaPacket);
msgbuf->Copy(static_cast<const uint8_t *>(msg), to_write);
write_buffer_ = merge(std::move(write_buffer_), std::move(msgbuf));
}
@ -121,8 +122,10 @@ class DummySocket : public NrSocketBase {
*len = to_read;
if (to_read < read_buffer_->len()) {
read_buffer_.reset(new DataBuffer(read_buffer_->data() + to_read,
read_buffer_->len() - to_read));
MediaPacket* newPacket = new MediaPacket;
newPacket->Copy(read_buffer_->data() + to_read,
read_buffer_->len() - to_read);
read_buffer_.reset(newPacket);
} else {
read_buffer_.reset();
}
@ -180,7 +183,8 @@ class DummySocket : public NrSocketBase {
void SetReadBuffer(const uint8_t *data, size_t len) {
EXPECT_EQ(nullptr, write_buffer_.get());
read_buffer_.reset(new DataBuffer(data, len));
read_buffer_.reset(new MediaPacket);
read_buffer_->Copy(data, len);
}
void ClearReadBuffer() {
@ -214,9 +218,9 @@ class DummySocket : public NrSocketBase {
DISALLOW_COPY_ASSIGN(DummySocket);
size_t writable_; // Amount we allow someone to write.
UniquePtr<DataBuffer> write_buffer_;
UniquePtr<MediaPacket> write_buffer_;
size_t readable_; // Amount we allow someone to read.
UniquePtr<DataBuffer> read_buffer_;
UniquePtr<MediaPacket> read_buffer_;
NR_async_cb cb_;
void *cb_arg_;

Просмотреть файл

@ -20,7 +20,6 @@ extern "C" {
#include "stun.h"
}
#include "databuffer.h"
#include "dummysocket.h"
#include "nr_socket_prsock.h"

Просмотреть файл

@ -139,10 +139,10 @@ class TransportTestPeer : public sigslot::has_slots<> {
void ConnectSocket_s(TransportTestPeer *peer) {
loopback_->Connect(peer->loopback_);
ASSERT_EQ((nsresult)NS_OK, loopback_->Init());
flow_->PushLayer(loopback_);
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(loopback_));
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
loopback_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
// SCTP here!
ASSERT_TRUE(sctp_);
@ -158,6 +158,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
}
void Disconnect_s() {
disconnect_all();
if (flow_) {
flow_ = nullptr;
}
@ -198,16 +199,15 @@ class TransportTestPeer : public sigslot::has_slots<> {
int received() const { return received_; }
bool connected() const { return connected_; }
static TransportResult SendPacket_s(const unsigned char* data, size_t len,
const RefPtr<TransportFlow>& flow) {
TransportResult res = flow->SendPacket(data, len);
delete data; // we always allocate
return res;
static TransportResult SendPacket_s(nsAutoPtr<MediaPacket> packet,
const RefPtr<TransportFlow>& flow,
TransportLayer* layer) {
return layer->SendPacket(*packet);
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
unsigned char *buffer = new unsigned char[len];
memcpy(buffer, data, len);
nsAutoPtr<MediaPacket> packet(new MediaPacket);
packet->Copy(data, len);
// Uses DISPATCH_NORMAL to avoid possible deadlocks when we're called
// from MainThread especially during shutdown (same as DataChannels).
@ -216,19 +216,18 @@ class TransportTestPeer : public sigslot::has_slots<> {
// a refptr to flow_ to avoid any async deletion issues (since we can't
// make 'this' into a refptr as it isn't refcounted)
RUN_ON_THREAD(test_utils_->sts_target(), WrapRunnableNM(
&TransportTestPeer::SendPacket_s, buffer, len, flow_),
&TransportTestPeer::SendPacket_s, packet, flow_, loopback_),
NS_DISPATCH_NORMAL);
return 0;
}
void PacketReceived(TransportFlow * flow, const unsigned char* data,
size_t len) {
std::cerr << "Received " << len << " bytes" << std::endl;
void PacketReceived(TransportLayer * layer, MediaPacket& packet) {
std::cerr << "Received " << packet.len() << " bytes" << std::endl;
// Pass the data to SCTP
usrsctp_conninput(static_cast<void *>(this), data, len, 0);
usrsctp_conninput(static_cast<void *>(this), packet.data(), packet.len(), 0);
}
// Process SCTP notification
@ -288,6 +287,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
bool connected_;
size_t sent_;
size_t received_;
// Owns the TransportLayerLoopback, but basically does nothing else.
RefPtr<TransportFlow> flow_;
TransportLayerLoopback *loopback_;

Просмотреть файл

@ -80,7 +80,7 @@ nrappkit copyright:
#include "logging.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/Unused.h"
#include "databuffer.h"
#include "mediapacket.h"
// mozilla/utils.h defines this as well
#ifdef UNIMPLEMENTED
@ -392,13 +392,14 @@ struct DeferredStunOperation {
nr_transport_addr *addr,
nr_socket *sock) :
server_(server),
buffer_(reinterpret_cast<const uint8_t *>(data), len),
buffer_(),
sock_(sock) {
buffer_.Copy(reinterpret_cast<const uint8_t *>(data), len);
nr_transport_addr_copy(&addr_, addr);
}
TestStunServer *server_;
DataBuffer buffer_;
MediaPacket buffer_;
nr_transport_addr addr_;
nr_socket *sock_;
};

Просмотреть файл

@ -24,7 +24,7 @@
#include "nsThreadUtils.h"
#include "nsXPCOM.h"
#include "databuffer.h"
#include "mediapacket.h"
#include "dtlsidentity.h"
#include "nricectxhandler.h"
#include "nricemediastream.h"
@ -76,7 +76,7 @@ class TransportLayerDummy : public TransportLayer {
return allow_init_ ? NS_OK : NS_ERROR_FAILURE;
}
TransportResult SendPacket(const unsigned char *data, size_t len) override {
TransportResult SendPacket(MediaPacket& packet) override {
MOZ_CRASH(); // Should never be called.
return 0;
}
@ -102,21 +102,21 @@ class TransportLayerLossy : public TransportLayer {
TransportLayerLossy() : loss_mask_(0), packet_(0), inspector_(nullptr) {}
~TransportLayerLossy () {}
TransportResult SendPacket(const unsigned char *data, size_t len) override {
MOZ_MTLOG(ML_NOTICE, LAYER_INFO << "SendPacket(" << len << ")");
TransportResult SendPacket(MediaPacket& packet) override {
MOZ_MTLOG(ML_NOTICE, LAYER_INFO << "SendPacket(" << packet.len() << ")");
if (loss_mask_ & (1 << (packet_ % 32))) {
MOZ_MTLOG(ML_NOTICE, "Dropping packet");
++packet_;
return len;
return packet.len();
}
if (inspector_) {
inspector_->Inspect(this, data, len);
inspector_->Inspect(this, packet.data(), packet.len());
}
++packet_;
return downward_->SendPacket(data, len);
return downward_->SendPacket(packet);
}
void SetLoss(uint32_t packet) {
@ -131,9 +131,8 @@ class TransportLayerLossy : public TransportLayer {
TL_SET_STATE(state);
}
void PacketReceived(TransportLayer *layer, const unsigned char *data,
size_t len) {
SignalPacketReceived(this, data, len);
void PacketReceived(TransportLayer *layer, MediaPacket& packet) {
SignalPacketReceived(this, packet);
}
TRANSPORT_LAYER_ID("lossy")
@ -166,7 +165,9 @@ class TransportLayerLossy : public TransportLayer {
class TlsParser {
public:
TlsParser(const unsigned char *data, size_t len)
: buffer_(data, len), offset_(0) {}
: buffer_(), offset_(0) {
buffer_.Copy(data, len);
}
bool Read(unsigned char* val) {
if (remaining() < 1) {
@ -214,16 +215,18 @@ class TlsParser {
const uint8_t *ptr() const { return buffer_.data() + offset_; }
void consume(size_t len) { offset_ += len; }
DataBuffer buffer_;
MediaPacket buffer_;
size_t offset_;
};
class DtlsRecordParser {
public:
DtlsRecordParser(const unsigned char *data, size_t len)
: buffer_(data, len), offset_(0) {}
: buffer_(), offset_(0) {
buffer_.Copy(data, len);
}
bool NextRecord(uint8_t* ct, nsAutoPtr<DataBuffer>* buffer) {
bool NextRecord(uint8_t* ct, nsAutoPtr<MediaPacket>* buffer) {
if (!remaining())
return false;
@ -236,7 +239,8 @@ class DtlsRecordParser {
consume(2);
CHECK_LENGTH(length);
DataBuffer* db = new DataBuffer(ptr(), length);
MediaPacket* db = new MediaPacket;
db->Copy(ptr(), length);
consume(length);
*ct = *ctp;
@ -250,7 +254,7 @@ class DtlsRecordParser {
const uint8_t *ptr() const { return buffer_.data() + offset_; }
void consume(size_t len) { offset_ += len; }
DataBuffer buffer_;
MediaPacket buffer_;
size_t offset_;
};
@ -264,7 +268,7 @@ class DtlsRecordInspector : public Inspector {
DtlsRecordParser parser(data, len);
uint8_t ct;
nsAutoPtr<DataBuffer> buf;
nsAutoPtr<MediaPacket> buf;
while(parser.NextRecord(&ct, &buf)) {
OnRecord(layer, ct, buf->data(), buf->len());
}
@ -281,20 +285,17 @@ class DtlsRecordInspector : public Inspector {
class DtlsInspectorInjector : public DtlsRecordInspector {
public:
DtlsInspectorInjector(uint8_t packet_type, uint8_t handshake_type,
const unsigned char *data, size_t len) :
const unsigned char *data, size_t len) :
packet_type_(packet_type),
handshake_type_(handshake_type),
injected_(false) {
data_.reset(new unsigned char[len]);
memcpy(data_.get(), data, len);
len_ = len;
handshake_type_(handshake_type) {
packet_.Copy(data, len);
}
virtual void OnRecord(TransportLayer* layer,
uint8_t content_type,
const unsigned char *data, size_t len) {
// Only inject once.
if (injected_) {
if (!packet_.data()) {
return;
}
@ -315,15 +316,14 @@ class DtlsInspectorInjector : public DtlsRecordInspector {
}
}
layer->SendPacket(data_.get(), len_);
layer->SendPacket(packet_);
packet_.Reset();
}
private:
uint8_t packet_type_;
uint8_t handshake_type_;
bool injected_;
UniquePtr<unsigned char[]> data_;
size_t len_;
MediaPacket packet_;
};
// Make a copy of the first instance of a message.
@ -382,17 +382,18 @@ class DtlsInspectorRecordHandshakeMessage : public DtlsRecordInspector {
return;
}
buffer_.Allocate(length);
if (!parser.Read(buffer_.data(), length)) {
UniquePtr<uint8_t[]> buffer(new uint8_t[length]);
if (!parser.Read(buffer.get(), length)) {
return;
}
buffer_.Take(std::move(buffer), length);
}
const DataBuffer& buffer() { return buffer_; }
const MediaPacket& buffer() { return buffer_; }
private:
uint8_t handshake_type_;
DataBuffer buffer_;
MediaPacket buffer_;
};
class TlsServerKeyExchangeECDHE {
@ -419,15 +420,16 @@ class TlsServerKeyExchangeECDHE {
return false;
}
public_key_.Allocate(point_length);
if (!parser.Read(public_key_.data(), point_length)) {
UniquePtr<uint8_t[]> key(new uint8_t[point_length]);
if (!parser.Read(key.get(), point_length)) {
return false;
}
public_key_.Take(std::move(key), point_length);
return true;
}
DataBuffer public_key_;
MediaPacket public_key_;
};
namespace {
@ -552,11 +554,18 @@ class TransportTestPeer : public sigslot::has_slots<> {
ASSERT_EQ((nsresult)NS_OK, res);
loopback_->Connect(peer->loopback_);
ASSERT_EQ((nsresult)NS_OK, loopback_->Init());
ASSERT_EQ((nsresult)NS_OK, logging_->Init());
ASSERT_EQ((nsresult)NS_OK, lossy_->Init());
ASSERT_EQ((nsresult)NS_OK, dtls_->Init());
dtls_->Chain(lossy_);
lossy_->Chain(logging_);
logging_->Chain(loopback_);
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(loopback_));
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(logging_));
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(lossy_));
ASSERT_EQ((nsresult)NS_OK, flow_->PushLayer(dtls_));
flow_->PushLayer(loopback_);
flow_->PushLayer(logging_);
flow_->PushLayer(lossy_);
flow_->PushLayer(dtls_);
if (dtls_->state() != TransportLayer::TS_ERROR) {
// Don't execute these blocks if DTLS didn't initialize.
@ -573,7 +582,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
}
}
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
dtls_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
}
void TweakCiphers(PRFileDesc* fd) {
@ -592,6 +601,17 @@ class TransportTestPeer : public sigslot::has_slots<> {
NS_DISPATCH_SYNC);
}
nsresult InitIce_s() {
nsresult rv = ice_->Init();
NS_ENSURE_SUCCESS(rv, rv);
rv = dtls_->Init();
NS_ENSURE_SUCCESS(rv, rv);
dtls_->Chain(ice_);
flow_->PushLayer(ice_);
flow_->PushLayer(dtls_);
return NS_OK;
}
void InitIce() {
nsresult res;
@ -619,21 +639,15 @@ class TransportTestPeer : public sigslot::has_slots<> {
ice_ = new TransportLayerIce();
ice_->SetParameters(stream, 1);
// Assemble the stack
nsAutoPtr<std::queue<mozilla::TransportLayer *> > layers(
new std::queue<mozilla::TransportLayer *>);
layers->push(ice_);
layers->push(dtls_);
test_utils_->sts_target()->Dispatch(
WrapRunnableRet(&res, flow_, &TransportFlow::PushLayers, layers),
WrapRunnableRet(&res, this, &TransportTestPeer::InitIce_s),
NS_DISPATCH_SYNC);
ASSERT_EQ((nsresult)NS_OK, res);
// Listen for media events
flow_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
flow_->SignalStateChange.connect(this, &TransportTestPeer::StateChanged);
dtls_->SignalPacketReceived.connect(this, &TransportTestPeer::PacketReceived);
dtls_->SignalStateChange.connect(this, &TransportTestPeer::StateChanged);
// Start gathering
test_utils_->sts_target()->Dispatch(
@ -703,27 +717,37 @@ class TransportTestPeer : public sigslot::has_slots<> {
ASSERT_TRUE(NS_SUCCEEDED(res));
}
TransportResult SendPacket(const unsigned char* data, size_t len) {
// WrapRunnable/lambda and move semantics (MediaPacket is not copyable) don't
// get along yet, so we need a wrapper. Gross.
static TransportResult SendPacketWrapper(TransportLayer* layer,
MediaPacket* packet) {
return layer->SendPacket(*packet);
}
TransportResult SendPacket(MediaPacket& packet) {
TransportResult ret;
test_utils_->sts_target()->Dispatch(
WrapRunnableRet(&ret, flow_, &TransportFlow::SendPacket, data, len),
WrapRunnableNMRet(&ret,
&TransportTestPeer::SendPacketWrapper,
dtls_,
&packet),
NS_DISPATCH_SYNC);
return ret;
}
void StateChanged(TransportFlow *flow, TransportLayer::State state) {
void StateChanged(TransportLayer *layer, TransportLayer::State state) {
if (state == TransportLayer::TS_OPEN) {
std::cerr << "Now connected" << std::endl;
}
}
void PacketReceived(TransportFlow * flow, const unsigned char* data,
size_t len) {
std::cerr << "Received " << len << " bytes" << std::endl;
void PacketReceived(TransportLayer* layer, MediaPacket& packet) {
std::cerr << "Received " << packet.len() << " bytes" << std::endl;
++received_packets_;
received_bytes_ += len;
received_bytes_ += packet.len();
}
void SetLoss(uint32_t loss) {
@ -758,7 +782,7 @@ class TransportTestPeer : public sigslot::has_slots<> {
TransportLayer::State tstate;
RUN_ON_THREAD(test_utils_->sts_target(),
WrapRunnableRet(&tstate, flow_, &TransportFlow::state));
WrapRunnableRet(&tstate, dtls_, &TransportLayer::state));
return tstate;
}
@ -938,7 +962,9 @@ class TransportTest : public MtransportTest {
for (size_t i= 0; i<count; ++i) {
memset(buf, count & 0xff, sizeof(buf));
TransportResult rv = p1_->SendPacket(buf, sizeof(buf));
MediaPacket packet;
packet.Copy(buf, sizeof(buf));
TransportResult rv = p1_->SendPacket(packet);
ASSERT_TRUE(rv > 0);
}
@ -1291,56 +1317,4 @@ TEST_F(TransportTest, TestDheOnlyFails) {
ConnectSocketExpectFail();
}
TEST(PushTests, LayerFail) {
RefPtr<TransportFlow> flow = new TransportFlow();
nsresult rv;
bool destroyed1, destroyed2;
rv = flow->PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
rv = flow->PushLayer(new TransportLayerDummy(false, &destroyed2));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(TransportLayer::TS_ERROR, flow->state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
rv = flow->PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed1);
}
TEST(PushTests, LayersFail) {
RefPtr<TransportFlow> flow = new TransportFlow();
nsresult rv;
bool destroyed1, destroyed2, destroyed3;
rv = flow->PushLayer(new TransportLayerDummy(true, &destroyed1));
ASSERT_TRUE(NS_SUCCEEDED(rv));
nsAutoPtr<std::queue<TransportLayer *> > layers(
new std::queue<TransportLayer *>());
layers->push(new TransportLayerDummy(true, &destroyed2));
layers->push(new TransportLayerDummy(false, &destroyed3));
rv = flow->PushLayers(layers);
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(TransportLayer::TS_ERROR, flow->state());
ASSERT_EQ(true, destroyed1);
ASSERT_EQ(true, destroyed2);
ASSERT_EQ(true, destroyed3);
layers = new std::queue<TransportLayer *>();
layers->push(new TransportLayerDummy(true, &destroyed2));
layers->push(new TransportLayerDummy(true, &destroyed3));
rv = flow->PushLayers(layers);
ASSERT_TRUE(NS_FAILED(rv));
ASSERT_EQ(true, destroyed2);
ASSERT_EQ(true, destroyed3);
}
} // end namespace

Просмотреть файл

@ -101,6 +101,7 @@ extern "C" {
#include "mozilla/UniquePtr.h"
#include "prinrval.h"
#include "mediapacket.h"
namespace mozilla {
@ -241,14 +242,15 @@ class TestNrSocket : public NrSocketBase {
class UdpPacket {
public:
UdpPacket(const void *msg, size_t len, const nr_transport_addr &addr) :
buffer_(new DataBuffer(static_cast<const uint8_t*>(msg), len)) {
buffer_(new MediaPacket) {
buffer_->Copy(static_cast<const uint8_t*>(msg), len);
// TODO(bug 1170299): Remove const_cast when no longer necessary
nr_transport_addr_copy(&remote_address_,
const_cast<nr_transport_addr*>(&addr));
}
nr_transport_addr remote_address_;
UniquePtr<DataBuffer> buffer_;
UniquePtr<MediaPacket> buffer_;
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(UdpPacket);
private:
@ -290,14 +292,15 @@ class TestNrSocket : public NrSocketBase {
nr_transport_addr *addr,
RefPtr<NrSocketBase> internal_socket) :
socket_(sock),
buffer_(reinterpret_cast<const uint8_t *>(data), len),
buffer_(),
flags_(flags),
internal_socket_(internal_socket) {
buffer_.Copy(reinterpret_cast<const uint8_t *>(data), len);
nr_transport_addr_copy(&to_, addr);
}
TestNrSocket *socket_;
DataBuffer buffer_;
MediaPacket buffer_;
int flags_;
nr_transport_addr to_;
RefPtr<NrSocketBase> internal_socket_;

Просмотреть файл

@ -14,20 +14,11 @@
namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
NS_IMPL_ISUPPORTS0(TransportFlow)
// There are some hacks here to allow destruction off of
// the main thread.
TransportFlow::~TransportFlow() {
// Make sure that if we are off the right thread, we have
// no more attached signals.
if (!CheckThreadInt()) {
MOZ_ASSERT(SignalStateChange.is_empty());
MOZ_ASSERT(SignalPacketReceived.is_empty());
}
// Push the destruction onto the STS thread. Note that there
// is still some possibility that someone is accessing this
// object simultaneously, but as long as smart pointer discipline
@ -47,13 +38,6 @@ void TransportFlow::DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers
ClearLayers(layers.get());
}
void TransportFlow::ClearLayers(std::queue<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
layers->pop();
}
}
void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
while (!layers->empty()) {
delete layers->front();
@ -61,159 +45,26 @@ void TransportFlow::ClearLayers(std::deque<TransportLayer *>* layers) {
}
}
nsresult TransportFlow::PushLayer(TransportLayer *layer) {
void TransportFlow::PushLayer(TransportLayer* layer) {
CheckThread();
UniquePtr<TransportLayer> layer_tmp(layer); // Destroy on failure.
// Don't allow pushes once we are in error state.
if (state_ == TransportLayer::TS_ERROR) {
MOZ_MTLOG(ML_ERROR, id_ + ": Can't call PushLayer in error state for flow");
return NS_ERROR_FAILURE;
}
nsresult rv = layer->Init();
if (!NS_SUCCEEDED(rv)) {
// Destroy the rest of the flow, because it's no longer in an acceptable
// state.
ClearLayers(layers_.get());
// Set ourselves to have failed.
MOZ_MTLOG(ML_ERROR, id_ << ": Layer initialization failed; invalidating");
StateChangeInt(TransportLayer::TS_ERROR);
return rv;
}
layers_->push_front(layer);
EnsureSameThread(layer);
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
// Re-target my signals to the new layer
if (old_layer) {
old_layer->SignalStateChange.disconnect(this);
old_layer->SignalPacketReceived.disconnect(this);
}
layers_->push_front(layer_tmp.release());
layer->Inserted(this, old_layer);
layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
StateChangeInt(layer->state());
return NS_OK;
}
// This is all-or-nothing.
nsresult TransportFlow::PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers) {
CheckThread();
MOZ_ASSERT(!layers->empty());
if (layers->empty()) {
MOZ_MTLOG(ML_ERROR, id_ << ": Can't call PushLayers with empty layers");
return NS_ERROR_INVALID_ARG;
}
// Don't allow pushes once we are in error state.
if (state_ == TransportLayer::TS_ERROR) {
MOZ_MTLOG(ML_ERROR,
id_ << ": Can't call PushLayers in error state for flow ");
ClearLayers(layers.get());
return NS_ERROR_FAILURE;
}
nsresult rv = NS_OK;
// Disconnect all the old signals.
disconnect_all();
TransportLayer *layer;
while (!layers->empty()) {
TransportLayer *old_layer = layers_->empty() ? nullptr : layers_->front();
layer = layers->front();
rv = layer->Init();
if (NS_FAILED(rv)) {
MOZ_MTLOG(ML_ERROR,
id_ << ": Layer initialization failed; invalidating flow ");
break;
}
EnsureSameThread(layer);
// Push the layer onto the queue.
layers_->push_front(layer);
layers->pop();
layer->Inserted(this, old_layer);
}
if (NS_FAILED(rv)) {
// Destroy any layers we could not push.
ClearLayers(layers.get());
// Now destroy the rest of the flow, because it's no longer
// in an acceptable state.
ClearLayers(layers_.get());
// Set ourselves to have failed.
StateChangeInt(TransportLayer::TS_ERROR);
// Return failure.
return rv;
}
// Finally, attach ourselves to the top layer.
layer->SignalStateChange.connect(this, &TransportFlow::StateChange);
layer->SignalPacketReceived.connect(this, &TransportFlow::PacketReceived);
StateChangeInt(layer->state()); // Signals if the state changes.
return NS_OK;
}
TransportLayer *TransportFlow::top() const {
CheckThread();
return layers_->empty() ? nullptr : layers_->front();
layer->SetFlowId(id_);
}
TransportLayer *TransportFlow::GetLayer(const std::string& id) const {
CheckThread();
for (std::deque<TransportLayer *>::const_iterator it = layers_->begin();
it != layers_->end(); ++it) {
if ((*it)->id() == id)
return *it;
if (layers_) {
for (TransportLayer* layer : *layers_) {
if (layer->id() == id)
return layer;
}
}
return nullptr;
}
TransportLayer::State TransportFlow::state() {
CheckThread();
return state_;
}
TransportResult TransportFlow::SendPacket(const unsigned char *data,
size_t len) {
CheckThread();
if (state_ != TransportLayer::TS_OPEN) {
return TE_ERROR;
}
return top() ? top()->SendPacket(data, len) : TE_ERROR;
}
bool TransportFlow::Contains(TransportLayer *layer) const {
if (layers_) {
for (auto& l : *layers_) {
if (l == layer) {
return true;
}
}
}
return false;
}
void TransportFlow::EnsureSameThread(TransportLayer *layer) {
// Enforce that if any of the layers have a thread binding,
// they all have the same binding.
@ -228,30 +79,4 @@ void TransportFlow::EnsureSameThread(TransportLayer *layer) {
}
}
void TransportFlow::StateChangeInt(TransportLayer::State state) {
CheckThread();
if (state == state_) {
return;
}
state_ = state;
SignalStateChange(this, state_);
}
void TransportFlow::StateChange(TransportLayer *layer,
TransportLayer::State state) {
CheckThread();
StateChangeInt(state);
}
void TransportFlow::PacketReceived(TransportLayer* layer,
const unsigned char *data,
size_t len) {
CheckThread();
SignalPacketReceived(this, data, len);
}
} // close namespace

Просмотреть файл

@ -10,7 +10,6 @@
#define transportflow_h__
#include <deque>
#include <queue>
#include <string>
#include "nscore.h"
@ -49,16 +48,13 @@
namespace mozilla {
class TransportFlow final : public nsISupports,
public sigslot::has_slots<> {
class TransportFlow final : public nsISupports {
public:
TransportFlow()
: id_("(anonymous)"),
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
explicit TransportFlow(const std::string id)
: id_(id),
state_(TransportLayer::TS_NONE),
layers_(new std::deque<TransportLayer *>) {}
const std::string& id() const { return id_; }
@ -70,33 +66,10 @@ class TransportFlow final : public nsISupports,
//
// The flow takes ownership of the layers after a successful
// push.
nsresult PushLayer(TransportLayer *layer);
void PushLayer(TransportLayer* layer);
// Convenience function to push multiple layers on. Layers
// are pushed on in the order that they are in the queue.
// Any failures cause the flow to become inoperable and
// destroys all the layers including those already pushed.
// TODO(ekr@rtfm.com): Change layers to be ref-counted.
nsresult PushLayers(nsAutoPtr<std::queue<TransportLayer *> > layers);
TransportLayer *top() const;
TransportLayer *GetLayer(const std::string& id) const;
// Wrappers for whatever TLayer happens to be the top layer
// at the time. This way you don't need to do top()->Foo().
TransportLayer::State state(); // Current state
TransportResult SendPacket(const unsigned char *data, size_t len);
// State has changed. Reflects the top flow.
sigslot::signal2<TransportFlow *, TransportLayer::State>
SignalStateChange;
// Data received on the flow
sigslot::signal3<TransportFlow*, const unsigned char *, size_t>
SignalPacketReceived;
bool Contains(TransportLayer *layer) const;
NS_DECL_THREADSAFE_ISUPPORTS
private:
@ -123,18 +96,12 @@ class TransportFlow final : public nsISupports,
void EnsureSameThread(TransportLayer *layer);
void StateChange(TransportLayer *layer, TransportLayer::State state);
void StateChangeInt(TransportLayer::State state);
void PacketReceived(TransportLayer* layer, const unsigned char *data,
size_t len);
static void DestroyFinal(nsAutoPtr<std::deque<TransportLayer *> > layers);
// Overload needed because we use deque internally and queue externally.
static void ClearLayers(std::deque<TransportLayer *>* layers);
static void ClearLayers(std::queue<TransportLayer *>* layers);
std::string id_;
TransportLayer::State state_;
UniquePtr<std::deque<TransportLayer *>> layers_;
nsCOMPtr<nsIEventTarget> target_;
};

Просмотреть файл

@ -30,9 +30,8 @@ nsresult TransportLayer::Init() {
return NS_OK;
}
void TransportLayer::Inserted(TransportFlow *flow, TransportLayer *downward) {
void TransportLayer::Chain(TransportLayer *downward) {
downward_ = downward;
flow_id_ = flow->id();
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "Inserted: downward='" <<
(downward ? downward->id(): "none") << "'");

Просмотреть файл

@ -17,6 +17,7 @@
#include "nsIEventTarget.h"
#include "m_cpp_utils.h"
#include "mediapacket.h"
namespace mozilla {
@ -51,8 +52,9 @@ class TransportLayer : public sigslot::has_slots<> {
nsresult Init(); // Called by Insert() to set up -- do not override
virtual nsresult InitInternal() { return NS_OK; } // Called by Init
// Called when inserted into a flow
virtual void Inserted(TransportFlow *flow, TransportLayer *downward);
void SetFlowId(const std::string& flow_id) {flow_id_ = flow_id;}
virtual void Chain(TransportLayer *downward);
// Downward interface
TransportLayer *downward() { return downward_; }
@ -60,7 +62,7 @@ class TransportLayer : public sigslot::has_slots<> {
// Get the state
State state() const { return state_; }
// Must be implemented by derived classes
virtual TransportResult SendPacket(const unsigned char *data, size_t len) = 0;
virtual TransportResult SendPacket(MediaPacket& packet) = 0;
// Get the thread.
const nsCOMPtr<nsIEventTarget> GetThread() const {
@ -71,8 +73,7 @@ class TransportLayer : public sigslot::has_slots<> {
// State has changed
sigslot::signal2<TransportLayer*, State> SignalStateChange;
// Data received on the flow
sigslot::signal3<TransportLayer*, const unsigned char *, size_t>
SignalPacketReceived;
sigslot::signal2<TransportLayer*, MediaPacket&> SignalPacketReceived;
// Return the layer id for this layer
virtual const std::string id() const = 0;

Просмотреть файл

@ -15,7 +15,6 @@
#include "dtlsidentity.h"
#include "keyhi.h"
#include "logging.h"
#include "mozilla/Move.h"
#include "mozilla/Telemetry.h"
#include "mozilla/UniquePtr.h"
#include "mozilla/Unused.h"
@ -63,23 +62,10 @@ static PRDescIdentity transport_layer_identity = PR_INVALID_IO_LAYER;
//
// All of this stuff is assumed to happen solely in a single thread
// (generally the SocketTransportService thread)
struct Packet {
Packet() : data_(nullptr), len_(0) {}
void Assign(const void *data, int32_t len) {
data_.reset(new uint8_t[len]);
memcpy(data_.get(), data, len);
len_ = len;
}
UniquePtr<uint8_t[]> data_;
int32_t len_;
};
void TransportLayerNSPRAdapter::PacketReceived(const void *data, int32_t len) {
void TransportLayerNSPRAdapter::PacketReceived(MediaPacket& packet) {
if (enabled_) {
input_.push(new Packet());
input_.back()->Assign(data, len);
input_.push(new MediaPacket(std::move(packet)));
}
}
@ -89,15 +75,16 @@ int32_t TransportLayerNSPRAdapter::Recv(void *buf, int32_t buflen) {
return -1;
}
Packet* front = input_.front();
if (buflen < front->len_) {
MediaPacket* front = input_.front();
int32_t count = static_cast<int32_t>(front->len());
if (buflen < count) {
MOZ_ASSERT(false, "Not enough buffer space to receive into");
PR_SetError(PR_BUFFER_OVERFLOW_ERROR, 0);
return -1;
}
int32_t count = front->len_;
memcpy(buf, front->data_.get(), count);
memcpy(buf, front->data(), count);
input_.pop();
delete front;
@ -111,8 +98,11 @@ int32_t TransportLayerNSPRAdapter::Write(const void *buf, int32_t length) {
return -1;
}
TransportResult r = output_->SendPacket(
static_cast<const unsigned char *>(buf), length);
MediaPacket packet;
// Copies. Oh well.
packet.Copy(static_cast<const uint8_t*>(buf), static_cast<size_t>(length));
TransportResult r = output_->SendPacket(packet);
if (r >= 0) {
return r;
}
@ -989,10 +979,9 @@ bool TransportLayerDtls::CheckAlpn() {
void TransportLayerDtls::PacketReceived(TransportLayer* layer,
const unsigned char *data,
size_t len) {
MediaPacket& packet) {
CheckThread();
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "PacketReceived(" << len << ")");
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "PacketReceived(" << packet.len() << ")");
if (state_ != TS_CONNECTING && state_ != TS_OPEN) {
MOZ_MTLOG(ML_DEBUG,
@ -1000,13 +989,23 @@ void TransportLayerDtls::PacketReceived(TransportLayer* layer,
return;
}
// not DTLS per RFC 7983
if (data[0] < 20 || data[0] > 63) {
if (!packet.data()) {
// Something ate this, probably the SRTP layer
return;
}
nspr_io_adapter_->PacketReceived(data, len);
// not DTLS per RFC 7983
if (packet.data()[0] < 20 || packet.data()[0] > 63) {
return;
}
nspr_io_adapter_->PacketReceived(packet);
GetDecryptedPackets();
}
void
TransportLayerDtls::GetDecryptedPackets()
{
// If we're still connecting, try to handshake
if (state_ == TS_CONNECTING) {
Handshake();
@ -1014,16 +1013,20 @@ void TransportLayerDtls::PacketReceived(TransportLayer* layer,
// Now try a recv if we're open, since there might be data left
if (state_ == TS_OPEN) {
// nICEr uses a 9216 bytes buffer to allow support for jumbo frames
unsigned char buf[9216];
int32_t rv;
// One packet might contain several DTLS packets
do {
rv = PR_Recv(ssl_fd_.get(), buf, sizeof(buf), 0, PR_INTERVAL_NO_WAIT);
// nICEr uses a 9216 bytes buffer to allow support for jumbo frames
// Can we peek to get a better idea of the actual size?
static const size_t kBufferSize = 9216;
auto buffer = MakeUnique<uint8_t[]>(kBufferSize);
rv = PR_Recv(ssl_fd_.get(), buffer.get(), kBufferSize, 0, PR_INTERVAL_NO_WAIT);
if (rv > 0) {
// We have data
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "Read " << rv << " bytes from NSS");
SignalPacketReceived(this, buf, rv);
MediaPacket packet;
packet.Take(std::move(buffer), static_cast<size_t>(rv));
SignalPacketReceived(this, packet);
} else if (rv == 0) {
TL_SET_STATE(TS_CLOSED);
} else {
@ -1069,8 +1072,7 @@ void TransportLayerDtls::SetState(State state,
TransportLayer::SetState(state, file, line);
}
TransportResult TransportLayerDtls::SendPacket(const unsigned char *data,
size_t len) {
TransportResult TransportLayerDtls::SendPacket(MediaPacket& packet) {
CheckThread();
if (state_ != TS_OPEN) {
MOZ_MTLOG(ML_ERROR, LAYER_INFO << "Can't call SendPacket() in state "
@ -1078,7 +1080,8 @@ TransportResult TransportLayerDtls::SendPacket(const unsigned char *data,
return TE_ERROR;
}
int32_t rv = PR_Send(ssl_fd_.get(), data, len, 0, PR_INTERVAL_NO_WAIT);
int32_t rv = PR_Send(ssl_fd_.get(), packet.data(), packet.len(), 0,
PR_INTERVAL_NO_WAIT);
if (rv > 0) {
// We have data

Просмотреть файл

@ -23,7 +23,6 @@
#include "ScopedNSSTypes.h"
#include "m_cpp_utils.h"
#include "dtlsidentity.h"
#include "transportflow.h"
#include "transportlayer.h"
namespace mozilla {
@ -37,7 +36,7 @@ class TransportLayerNSPRAdapter {
input_(),
enabled_(true) {}
void PacketReceived(const void *data, int32_t len);
void PacketReceived(MediaPacket& packet);
int32_t Recv(void *buf, int32_t buflen);
int32_t Write(const void *buf, int32_t length);
void SetEnabled(bool enabled) { enabled_ = enabled; }
@ -46,7 +45,7 @@ class TransportLayerNSPRAdapter {
DISALLOW_COPY_ASSIGN(TransportLayerNSPRAdapter);
TransportLayer *output_;
std::queue<Packet *> input_;
std::queue<MediaPacket *> input_;
bool enabled_;
};
@ -95,12 +94,11 @@ class TransportLayerDtls final : public TransportLayer {
// Transport layer overrides.
nsresult InitInternal() override;
void WasInserted() override;
TransportResult SendPacket(const unsigned char *data, size_t len) override;
TransportResult SendPacket(MediaPacket& packet) override;
// Signals
void StateChange(TransportLayer *layer, State state);
void PacketReceived(TransportLayer* layer, const unsigned char *data,
size_t len);
void PacketReceived(TransportLayer* layer, MediaPacket& packet);
// For testing use only. Returns the fd.
PRFileDesc* internal_fd() { CheckThread(); return ssl_fd_.get(); }
@ -140,6 +138,7 @@ class TransportLayerDtls final : public TransportLayer {
bool Setup();
bool SetupCipherSuites(UniquePRFileDesc& ssl_fd) const;
bool SetupAlpn(UniquePRFileDesc& ssl_fd) const;
void GetDecryptedPackets();
void Handshake();
bool CheckAlpn();

Просмотреть файл

@ -171,22 +171,21 @@ void TransportLayerIce::RestoreOldStream() {
// later.
}
TransportResult TransportLayerIce::SendPacket(const unsigned char *data,
size_t len) {
TransportResult TransportLayerIce::SendPacket(MediaPacket& packet) {
CheckThread();
// use old_stream_ until stream_ is ready
nsresult res = (old_stream_?old_stream_:stream_)->SendPacket(component_,
data,
len);
packet.data(),
packet.len());
if (!NS_SUCCEEDED(res)) {
return (res == NS_BASE_STREAM_WOULD_BLOCK) ?
TE_WOULDBLOCK : TE_ERROR;
}
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " SendPacket(" << len << ") succeeded");
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " SendPacket(" << packet.len() << ") succeeded");
return len;
return packet.len();
}
@ -227,7 +226,12 @@ void TransportLayerIce::IcePacketReceived(NrIceMediaStream *stream, int componen
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "PacketReceived(" << stream->name() << ","
<< component << "," << len << ")");
SignalPacketReceived(this, data, len);
// Might be useful to allow MediaPacket to borrow a buffer (ie; not take
// ownership, but copy it if the MediaPacket is moved). This could be a
// footgun though with MediaPackets that end up on the heap.
MediaPacket packet;
packet.Copy(data, len);
SignalPacketReceived(this, packet);
}
} // close namespace

Просмотреть файл

@ -41,7 +41,7 @@ class TransportLayerIce : public TransportLayer {
void RestoreOldStream(); // called after unsuccessful ice restart
// Transport layer overrides.
TransportResult SendPacket(const unsigned char *data, size_t len) override;
TransportResult SendPacket(MediaPacket& packet) override;
// Slots for ICE
void IceCandidate(NrIceMediaStream *stream, const std::string&);

Просмотреть файл

@ -25,13 +25,13 @@ void TransportLayerLogging::WasInserted() {
}
TransportResult
TransportLayerLogging::SendPacket(const unsigned char *data, size_t len) {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "SendPacket(" << len << ")");
TransportLayerLogging::SendPacket(MediaPacket& packet) {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "SendPacket(" << packet.len() << ")");
if (downward_) {
return downward_->SendPacket(data, len);
return downward_->SendPacket(packet);
}
return static_cast<TransportResult>(len);
return static_cast<TransportResult>(packet.len());
}
void TransportLayerLogging::StateChange(TransportLayer *layer, State state) {
@ -41,13 +41,10 @@ void TransportLayerLogging::StateChange(TransportLayer *layer, State state) {
}
void TransportLayerLogging::PacketReceived(TransportLayer* layer,
const unsigned char *data,
size_t len) {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "PacketReceived(" << len << ")");
MediaPacket& packet) {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "PacketReceived(" << packet.len() << ")");
SignalPacketReceived(this, data, len);
SignalPacketReceived(this, packet);
}
} // close namespace

Просмотреть файл

@ -20,12 +20,11 @@ public:
TransportLayerLogging() {}
// Overrides for TransportLayer
TransportResult SendPacket(const unsigned char *data, size_t len) override;
TransportResult SendPacket(MediaPacket& packet) override;
// Signals (forwarded to upper layer)
void StateChange(TransportLayer *layer, State state);
void PacketReceived(TransportLayer* layer, const unsigned char *data,
size_t len);
void PacketReceived(TransportLayer* layer, MediaPacket& packet);
TRANSPORT_LAYER_ID("log")

Просмотреть файл

@ -60,39 +60,38 @@ void TransportLayerLoopback::Connect(TransportLayerLoopback* peer) {
}
TransportResult
TransportLayerLoopback::SendPacket(const unsigned char *data, size_t len) {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "SendPacket(" << len << ")");
TransportLayerLoopback::SendPacket(MediaPacket& packet) {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << "SendPacket(" << packet.len() << ")");
if (!peer_) {
MOZ_MTLOG(ML_ERROR, "Discarding packet because peer not attached");
return TE_ERROR;
}
nsresult res = peer_->QueuePacket(data, len);
size_t len = packet.len();
nsresult res = peer_->QueuePacket(packet);
if (!NS_SUCCEEDED(res))
return TE_ERROR;
return static_cast<TransportResult>(len);
}
nsresult TransportLayerLoopback::QueuePacket(const unsigned char *data,
size_t len) {
nsresult TransportLayerLoopback::QueuePacket(MediaPacket& packet) {
MOZ_ASSERT(packets_lock_);
PR_Lock(packets_lock_);
if (combinePackets_ && !packets_.empty()) {
QueuedPacket *packet = packets_.front();
packets_.pop();
MediaPacket *prevPacket = packets_.front();
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " Enqueuing combined packets of length " << packet->len() << " and " << len);
packets_.push(new QueuedPacket());
packets_.back()->Assign(packet->data(), packet->len(),
data, len);
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " Enqueuing combined packets of length " << prevPacket->len() << " and " << packet.len());
auto combined = MakeUnique<uint8_t[]>(prevPacket->len() + packet.len());
memcpy(combined.get(), prevPacket->data(), prevPacket->len());
memcpy(combined.get() + prevPacket->len(), packet.data(), packet.len());
prevPacket->Take(std::move(combined), prevPacket->len() + packet.len());
} else {
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " Enqueuing packet of length " << len);
packets_.push(new QueuedPacket());
packets_.back()->Assign(data, len);
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " Enqueuing packet of length " << packet.len());
packets_.push(new MediaPacket(std::move(packet)));
}
PRStatus r = PR_Unlock(packets_lock_);
@ -106,14 +105,12 @@ nsresult TransportLayerLoopback::QueuePacket(const unsigned char *data,
void TransportLayerLoopback::DeliverPackets() {
while (!packets_.empty()) {
QueuedPacket *packet = packets_.front();
UniquePtr<MediaPacket> packet(packets_.front());
packets_.pop();
MOZ_MTLOG(ML_DEBUG, LAYER_INFO << " Delivering packet of length " <<
packet->len());
SignalPacketReceived(this, packet->data(), packet->len());
delete packet;
SignalPacketReceived(this, *packet);
}
}

Просмотреть файл

@ -42,7 +42,7 @@ class TransportLayerLoopback : public TransportLayer {
~TransportLayerLoopback() {
while (!packets_.empty()) {
QueuedPacket *packet = packets_.front();
MediaPacket *packet = packets_.front();
packets_.pop();
delete packet;
}
@ -72,7 +72,7 @@ class TransportLayerLoopback : public TransportLayer {
void CombinePackets(bool combine) { combinePackets_ = combine; }
// Overrides for TransportLayer
TransportResult SendPacket(const unsigned char *data, size_t len) override;
TransportResult SendPacket(MediaPacket& packet) override;
// Deliver queued packets
void DeliverPackets();
@ -82,41 +82,6 @@ class TransportLayerLoopback : public TransportLayer {
private:
DISALLOW_COPY_ASSIGN(TransportLayerLoopback);
// A queued packet
class QueuedPacket {
public:
QueuedPacket() : data_(nullptr), len_(0) {}
~QueuedPacket() {
delete [] data_;
}
void Assign(const unsigned char *data, size_t len) {
data_ = new unsigned char[len];
memcpy(static_cast<void *>(data_),
static_cast<const void *>(data), len);
len_ = len;
}
void Assign(const unsigned char *data1, size_t len1,
const unsigned char *data2, size_t len2) {
data_ = new unsigned char[len1 + len2];
memcpy(static_cast<void *>(data_),
static_cast<const void *>(data1), len1);
memcpy(static_cast<void *>(data_ + len1),
static_cast<const void *>(data2), len2);
len_ = len1 + len2;
}
const unsigned char *data() const { return data_; }
size_t len() const { return len_; }
private:
DISALLOW_COPY_ASSIGN(QueuedPacket);
unsigned char *data_;
size_t len_;
};
// A timer to deliver packets if some are available
// Fires every 100 ms
class Deliverer : public nsITimerCallback
@ -142,11 +107,11 @@ class TransportLayerLoopback : public TransportLayer {
};
// Queue a packet for delivery
nsresult QueuePacket(const unsigned char *data, size_t len);
nsresult QueuePacket(MediaPacket& packet);
TransportLayerLoopback* peer_;
nsCOMPtr<nsITimer> timer_;
std::queue<QueuedPacket *> packets_;
std::queue<MediaPacket *> packets_;
PRLock *packets_lock_;
RefPtr<Deliverer> deliverer_;
bool combinePackets_;

Просмотреть файл

@ -0,0 +1,273 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#include "transportlayersrtp.h"
#include "transportlayerdtls.h"
#include "logging.h"
#include "nsError.h"
#include "mozilla/Assertions.h"
#include "transportlayerdtls.h"
#include "srtp.h"
#include "nsAutoPtr.h"
namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
TransportLayerSrtp::TransportLayerSrtp(TransportLayerDtls& dtls)
{
// We need to connect to the dtls layer, not the ice layer, because even
// though the packets that DTLS decrypts don't flow through us, we do base our
// keying information on the keying information established by the DTLS layer.
dtls.SignalStateChange.connect(this, &TransportLayerSrtp::StateChange);
TL_SET_STATE(dtls.state());
}
void
TransportLayerSrtp::WasInserted()
{
// Connect to the lower layers
if (!Setup()) {
TL_SET_STATE(TS_ERROR);
}
}
bool
TransportLayerSrtp::Setup()
{
CheckThread();
if (!downward_) {
MOZ_MTLOG(ML_ERROR, "SRTP layer with nothing below. This is useless");
return false;
}
// downward_ is the TransportLayerIce
downward_->SignalPacketReceived.connect(this, &TransportLayerSrtp::PacketReceived);
return true;
}
static bool IsRtp(const unsigned char* data, size_t len)
{
if (len < 2)
return false;
// Check if this is a RTCP packet. Logic based on the types listed in
// media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc
// Anything outside this range is RTP.
if ((data[1] < 192) || (data[1] > 207))
return true;
if (data[1] == 192) // FIR
return false;
if (data[1] == 193) // NACK, but could also be RTP. This makes us sad
return true; // but it's how webrtc.org behaves.
if (data[1] == 194)
return true;
if (data[1] == 195) // IJ.
return false;
if ((data[1] > 195) && (data[1] < 200)) // the > 195 is redundant
return true;
if ((data[1] >= 200) && (data[1] <= 207)) // SR, RR, SDES, BYE,
return false; // APP, RTPFB, PSFB, XR
MOZ_ASSERT(false); // Not reached, belt and suspenders.
return true;
}
TransportResult
TransportLayerSrtp::SendPacket(MediaPacket& packet)
{
if (packet.len() < 4) {
MOZ_ASSERT(false);
return TE_ERROR;
}
MOZ_ASSERT(packet.capacity() - packet.len() >= SRTP_MAX_EXPANSION);
int out_len;
nsresult res;
switch (packet.type()) {
case MediaPacket::RTP:
MOZ_MTLOG(ML_INFO, "Attempting to protect RTP...");
res = mSendSrtp->ProtectRtp(packet.data(), packet.len(), packet.capacity(), &out_len);
break;
case MediaPacket::RTCP:
MOZ_MTLOG(ML_INFO, "Attempting to protect RTCP...");
res = mSendSrtp->ProtectRtcp(packet.data(), packet.len(), packet.capacity(), &out_len);
break;
default:
MOZ_CRASH("SRTP layer asked to send packet that is neither RTP or RTCP");
}
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR,
"Error protecting RTP/RTCP len=" << packet.len()
<< "[" << std::hex
<< packet.data()[0] << " "
<< packet.data()[1] << " "
<< packet.data()[2] << " "
<< packet.data()[3]
<< "]");
return TE_ERROR;
}
size_t unencrypted_len = packet.len();
packet.SetLength(out_len);
TransportResult bytes = downward_->SendPacket(packet);
if (bytes == out_len) {
// Whole packet was written, but the encrypted length might be different.
// Don't confuse the caller.
return unencrypted_len;
}
if (bytes == TE_WOULDBLOCK) {
return TE_WOULDBLOCK;
}
return TE_ERROR;
}
void
TransportLayerSrtp::StateChange(TransportLayer* layer, State state)
{
if (state == TS_OPEN) {
TransportLayerDtls* dtls = static_cast<TransportLayerDtls*>(layer);
MOZ_ASSERT(dtls); // DTLS is mandatory
uint16_t cipher_suite;
nsresult res = dtls->GetSrtpCipher(&cipher_suite);
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Failed to negotiate DTLS-SRTP. This is an error");
TL_SET_STATE(TS_ERROR);
return;
}
// SRTP Key Exporter as per RFC 5764 S 4.2
unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2];
res = dtls->ExportKeyingMaterial(
kDTLSExporterLabel, false, "", srtp_block, sizeof(srtp_block));
if (NS_FAILED(res)) {
MOZ_MTLOG(ML_ERROR, "Failed to compute DTLS-SRTP keys. This is an error");
TL_SET_STATE(TS_ERROR);
return;
}
// Slice and dice as per RFC 5764 S 4.2
unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH];
unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH];
int offset = 0;
memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
offset += SRTP_MASTER_KEY_LENGTH;
memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
offset += SRTP_MASTER_KEY_LENGTH;
memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH,
srtp_block + offset,
SRTP_MASTER_SALT_LENGTH);
offset += SRTP_MASTER_SALT_LENGTH;
memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH,
srtp_block + offset,
SRTP_MASTER_SALT_LENGTH);
offset += SRTP_MASTER_SALT_LENGTH;
MOZ_ASSERT(offset == sizeof(srtp_block));
unsigned char* write_key;
unsigned char* read_key;
if (dtls->role() == TransportLayerDtls::CLIENT) {
write_key = client_write_key;
read_key = server_write_key;
} else {
write_key = server_write_key;
read_key = client_write_key;
}
MOZ_ASSERT(!mSendSrtp && !mRecvSrtp);
mSendSrtp =
SrtpFlow::Create(cipher_suite, false, write_key, SRTP_TOTAL_KEY_LENGTH);
mRecvSrtp =
SrtpFlow::Create(cipher_suite, true, read_key, SRTP_TOTAL_KEY_LENGTH);
if (!mSendSrtp || !mRecvSrtp) {
MOZ_MTLOG(ML_ERROR, "Couldn't create SRTP flow.");
TL_SET_STATE(TS_ERROR);
return;
}
MOZ_MTLOG(ML_INFO, "Created SRTP flow!");
}
TL_SET_STATE(state);
}
void
TransportLayerSrtp::PacketReceived(TransportLayer* layer, MediaPacket& packet)
{
if (state() != TS_OPEN) {
return;
}
if (!packet.data()) {
// Something ate this, probably the DTLS layer
return;
}
if (packet.len() < 4) {
return;
}
// not RTP/RTCP per RFC 7983
if (packet.data()[0] <= 127 || packet.data()[0] >= 192) {
return;
}
// We want to keep the encrypted packet around for packet dumping
packet.CopyDataToEncrypted();
int outLen;
nsresult res;
if (IsRtp(packet.data(), packet.len())) {
packet.SetType(MediaPacket::RTP);
MOZ_MTLOG(ML_INFO, "Attempting to unprotect RTP...");
res = mRecvSrtp->UnprotectRtp(packet.data(), packet.len(), packet.len(), &outLen);
} else {
packet.SetType(MediaPacket::RTCP);
MOZ_MTLOG(ML_INFO, "Attempting to unprotect RTCP...");
res = mRecvSrtp->UnprotectRtcp(packet.data(), packet.len(), packet.len(), &outLen);
}
if (NS_SUCCEEDED(res)) {
packet.SetLength(outLen);
SignalPacketReceived(this, packet);
} else {
// TODO: What do we do wrt packet dumping here? Maybe signal an empty
// packet? Signal the still-encrypted packet?
MOZ_MTLOG(ML_ERROR,
"Error unprotecting RTP/RTCP len=" << packet.len()
<< "[" << std::hex
<< packet.data()[0] << " "
<< packet.data()[1] << " "
<< packet.data()[2] << " "
<< packet.data()[3]
<< "]");
}
}
} // namespace mozilla

Просмотреть файл

@ -0,0 +1,44 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef transportlayersrtp_h__
#define transportlayersrtp_h__
#include <string>
#include "transportlayer.h"
#include "mozilla/RefPtr.h"
#include "SrtpFlow.h"
namespace mozilla {
class TransportLayerDtls;
class TransportLayerSrtp final : public TransportLayer {
public:
explicit TransportLayerSrtp(TransportLayerDtls& dtls);
virtual ~TransportLayerSrtp() {};
// Transport layer overrides.
void WasInserted() override;
TransportResult SendPacket(MediaPacket& packet) override;
// Signals
void StateChange(TransportLayer *layer, State state);
void PacketReceived(TransportLayer* layer, MediaPacket& packet);
TRANSPORT_LAYER_ID("srtp")
private:
bool Setup();
DISALLOW_COPY_ASSIGN(TransportLayerSrtp);
RefPtr<SrtpFlow> mSendSrtp;
RefPtr<SrtpFlow> mRecvSrtp;
};
} // close namespace
#endif

Просмотреть файл

@ -24,6 +24,7 @@
#include "transportflow.h"
#include "transportlayerloopback.h"
#include "transportlayerdtls.h"
#include "transportlayersrtp.h"
#include "mozilla/SyncRunnable.h"
#include "mtransport_test_utils.h"
#include "SharedBuffer.h"
@ -169,52 +170,40 @@ class TransportInfo {
public:
TransportInfo() :
flow_(nullptr),
loopback_(nullptr),
dtls_(nullptr) {}
loopback_(nullptr) {}
static void InitAndConnect(TransportInfo &client, TransportInfo &server) {
client.Init(true);
server.Init(false);
client.PushLayers();
server.PushLayers();
client.Connect(&server);
server.Connect(&client);
}
void Init(bool client) {
nsresult res;
flow_ = new TransportFlow();
loopback_ = new TransportLayerLoopback();
dtls_ = new TransportLayerDtls();
res = loopback_->Init();
if (res != NS_OK) {
FreeLayers();
}
ASSERT_EQ((nsresult)NS_OK, res);
UniquePtr<TransportLayerLoopback> loopback(new TransportLayerLoopback);
UniquePtr<TransportLayerDtls> dtls(new TransportLayerDtls);
UniquePtr<TransportLayerSrtp> srtp(new TransportLayerSrtp(*dtls));
std::vector<uint16_t> ciphers;
ciphers.push_back(SRTP_AES128_CM_HMAC_SHA1_80);
dtls_->SetSrtpCiphers(ciphers);
dtls_->SetIdentity(DtlsIdentity::Generate());
dtls_->SetRole(client ? TransportLayerDtls::CLIENT :
dtls->SetSrtpCiphers(ciphers);
dtls->SetIdentity(DtlsIdentity::Generate());
dtls->SetRole(client ? TransportLayerDtls::CLIENT :
TransportLayerDtls::SERVER);
dtls_->SetVerificationAllowAll();
}
dtls->SetVerificationAllowAll();
void PushLayers() {
nsresult res;
ASSERT_EQ(NS_OK, loopback->Init());
ASSERT_EQ(NS_OK, dtls->Init());
ASSERT_EQ(NS_OK, srtp->Init());
nsAutoPtr<std::queue<TransportLayer *> > layers(
new std::queue<TransportLayer *>);
layers->push(loopback_);
layers->push(dtls_);
res = flow_->PushLayers(layers);
if (res != NS_OK) {
FreeLayers();
}
ASSERT_EQ((nsresult)NS_OK, res);
dtls->Chain(loopback.get());
srtp->Chain(loopback.get());
flow_ = new TransportFlow();
loopback_ = loopback.release();
flow_->PushLayer(loopback_);
flow_->PushLayer(dtls.release());
flow_->PushLayer(srtp.release());
}
void Connect(TransportInfo* peer) {
@ -224,27 +213,16 @@ class TransportInfo {
loopback_->Connect(peer->loopback_);
}
// Free the memory allocated at the beginning of Init
// if failure occurs before layers setup.
void FreeLayers() {
delete loopback_;
loopback_ = nullptr;
delete dtls_;
dtls_ = nullptr;
}
void Shutdown() {
if (loopback_) {
loopback_->Disconnect();
}
loopback_ = nullptr;
dtls_ = nullptr;
flow_ = nullptr;
}
RefPtr<TransportFlow> flow_;
TransportLayerLoopback *loopback_;
TransportLayerDtls *dtls_;
};
class TestAgent {
@ -369,9 +347,9 @@ class TestAgentSend : public TestAgent {
nullptr,
test_utils->sts_target(),
false,
audio_stream_track_.get(),
audio_conduit_);
audio_pipeline->SetTrack(audio_stream_track_.get());
audio_pipeline->Start();
audio_pipeline_ = audio_pipeline;

Просмотреть файл

@ -30,7 +30,6 @@
#include "VideoSegment.h"
#include "VideoStreamTrack.h"
#include "VideoUtils.h"
#include "databuffer.h"
#include "libyuv/convert.h"
#include "mozilla/PeerIdentity.h"
#include "mozilla/Preferences.h"
@ -675,8 +674,6 @@ protected:
UniquePtr<AudioConverter> mAudioConverter;
};
static char kDTLSExporterLabel[] = "EXTRACTOR-dtls_srtp";
MediaPipeline::MediaPipeline(const std::string& aPc,
DirectionType aDirection,
nsCOMPtr<nsIEventTarget> aMainThread,
@ -689,9 +686,7 @@ MediaPipeline::MediaPipeline(const std::string& aPc,
, mRtcp(nullptr, RTCP)
, mMainThread(aMainThread)
, mStsThread(aStsThread)
, mTransport(new PipelineTransport(this)) // PipelineTransport() will access
// this->mStsThread; moved here
// for safety
, mTransport(new PipelineTransport(aStsThread))
, mRtpPacketsSent(0)
, mRtcpPacketsSent(0)
, mRtpPacketsReceived(0)
@ -866,9 +861,9 @@ MediaPipeline::GetContributingSourceStats(
}
void
MediaPipeline::StateChange(TransportFlow* aFlow, TransportLayer::State aState)
MediaPipeline::StateChange(TransportLayer* aLayer, TransportLayer::State aState)
{
TransportInfo* info = GetTransportInfo_s(aFlow);
TransportInfo* info = GetTransportInfo_s(aLayer);
MOZ_ASSERT(info);
if (aState == TransportLayer::TS_OPEN) {
@ -921,101 +916,14 @@ MediaPipeline::TransportReady_s(TransportInfo& aInfo)
mDescription.c_str(),
ToString(aInfo.mType));
// TODO(bcampen@mozilla.com): Should we disconnect from the flow on failure?
nsresult res;
// Now instantiate the SRTP objects
TransportLayerDtls* dtls = static_cast<TransportLayerDtls*>(
aInfo.mTransport->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls); // DTLS is mandatory
uint16_t cipher_suite;
res = dtls->GetSrtpCipher(&cipher_suite);
if (NS_FAILED(res)) {
CSFLogError(LOGTAG, "Failed to negotiate DTLS-SRTP. This is an error");
aInfo.mState = StateType::MP_CLOSED;
UpdateRtcpMuxState(aInfo);
return res;
}
// SRTP Key Exporter as per RFC 5764 S 4.2
unsigned char srtp_block[SRTP_TOTAL_KEY_LENGTH * 2];
res = dtls->ExportKeyingMaterial(
kDTLSExporterLabel, false, "", srtp_block, sizeof(srtp_block));
if (NS_FAILED(res)) {
CSFLogError(LOGTAG, "Failed to compute DTLS-SRTP keys. This is an error");
aInfo.mState = StateType::MP_CLOSED;
UpdateRtcpMuxState(aInfo);
MOZ_CRASH(); // TODO: Remove once we have enough field experience to
// know it doesn't happen. bug 798797. Note that the
// code after this never executes.
return res;
}
// Slice and dice as per RFC 5764 S 4.2
unsigned char client_write_key[SRTP_TOTAL_KEY_LENGTH];
unsigned char server_write_key[SRTP_TOTAL_KEY_LENGTH];
int offset = 0;
memcpy(client_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
offset += SRTP_MASTER_KEY_LENGTH;
memcpy(server_write_key, srtp_block + offset, SRTP_MASTER_KEY_LENGTH);
offset += SRTP_MASTER_KEY_LENGTH;
memcpy(client_write_key + SRTP_MASTER_KEY_LENGTH,
srtp_block + offset,
SRTP_MASTER_SALT_LENGTH);
offset += SRTP_MASTER_SALT_LENGTH;
memcpy(server_write_key + SRTP_MASTER_KEY_LENGTH,
srtp_block + offset,
SRTP_MASTER_SALT_LENGTH);
offset += SRTP_MASTER_SALT_LENGTH;
MOZ_ASSERT(offset == sizeof(srtp_block));
unsigned char* write_key;
unsigned char* read_key;
if (dtls->role() == TransportLayerDtls::CLIENT) {
write_key = client_write_key;
read_key = server_write_key;
} else {
write_key = server_write_key;
read_key = client_write_key;
}
MOZ_ASSERT(!aInfo.mSendSrtp && !aInfo.mRecvSrtp);
aInfo.mSendSrtp =
SrtpFlow::Create(cipher_suite, false, write_key, SRTP_TOTAL_KEY_LENGTH);
aInfo.mRecvSrtp =
SrtpFlow::Create(cipher_suite, true, read_key, SRTP_TOTAL_KEY_LENGTH);
if (!aInfo.mSendSrtp || !aInfo.mRecvSrtp) {
CSFLogError(
LOGTAG, "Couldn't create SRTP flow for %s", ToString(aInfo.mType));
aInfo.mState = StateType::MP_CLOSED;
UpdateRtcpMuxState(aInfo);
return NS_ERROR_FAILURE;
}
if (mDirection == DirectionType::RECEIVE) {
CSFLogInfo(LOGTAG,
"Listening for %s packets received on %p",
ToString(aInfo.mType),
dtls->downward());
aInfo.mSrtp);
switch (aInfo.mType) {
case RTP:
dtls->downward()->SignalPacketReceived.connect(
this, &MediaPipeline::RtpPacketReceived);
break;
case RTCP:
dtls->downward()->SignalPacketReceived.connect(
this, &MediaPipeline::RtcpPacketReceived);
break;
case MUX:
dtls->downward()->SignalPacketReceived.connect(
this, &MediaPipeline::PacketReceived);
break;
default:
MOZ_CRASH();
}
aInfo.mSrtp->SignalPacketReceived.connect(
this, &MediaPipeline::PacketReceived);
}
aInfo.mState = StateType::MP_OPEN;
@ -1050,28 +958,19 @@ MediaPipeline::UpdateRtcpMuxState(TransportInfo& aInfo)
if (aInfo.mType == MUX) {
if (aInfo.mTransport == mRtcp.mTransport) {
mRtcp.mState = aInfo.mState;
if (!mRtcp.mSendSrtp) {
mRtcp.mSendSrtp = aInfo.mSendSrtp;
mRtcp.mRecvSrtp = aInfo.mRecvSrtp;
}
}
}
}
nsresult
MediaPipeline::SendPacket(const TransportFlow* aFlow, const void* aData, int aLen)
MediaPipeline::SendPacket(TransportLayer* aLayer, MediaPacket& packet)
{
ASSERT_ON_THREAD(mStsThread);
// Note that we bypass the DTLS layer here
TransportLayerDtls* dtls =
static_cast<TransportLayerDtls*>(aFlow->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls);
int len = packet.len();
TransportResult res = aLayer->SendPacket(packet);
TransportResult res =
dtls->downward()->SendPacket(static_cast<const unsigned char*>(aData), aLen);
if (res != aLen) {
if (res != len) {
// Ignore blocking indications
if (res == TE_WOULDBLOCK)
return NS_OK;
@ -1148,9 +1047,7 @@ MediaPipeline::IncrementRtcpPacketsReceived()
}
void
MediaPipeline::RtpPacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen)
MediaPipeline::RtpPacketReceived(TransportLayer* aLayer, MediaPacket& packet)
{
if (mDirection == DirectionType::TRANSMIT) {
return;
@ -1171,25 +1068,17 @@ MediaPipeline::RtpPacketReceived(TransportLayer* aLayer,
return;
}
if (mRtp.mTransport->state() != TransportLayer::TS_OPEN) {
if (mRtp.mSrtp->state() != TransportLayer::TS_OPEN) {
CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
return;
}
// This should never happen.
MOZ_ASSERT(mRtp.mRecvSrtp);
if (!aLen) {
return;
}
// Filter out everything but RTP/RTCP
if (aData[0] < 128 || aData[0] > 191) {
if (!packet.len()) {
return;
}
webrtc::RTPHeader header;
if (!mRtpParser->Parse(aData, aLen, &header, true)) {
if (!mRtpParser->Parse(packet.data(), packet.len(), &header, true)) {
return;
}
@ -1235,49 +1124,26 @@ MediaPipeline::RtpPacketReceived(TransportLayer* aLayer,
}
}
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false, aData, aLen);
// Make a copy rather than cast away constness
auto innerData = MakeUnique<unsigned char[]>(aLen);
memcpy(innerData.get(), aData, aLen);
int outLen = 0;
nsresult res =
mRtp.mRecvSrtp->UnprotectRtp(innerData.get(), aLen, aLen, &outLen);
if (!NS_SUCCEEDED(res)) {
char tmp[16];
SprintfLiteral(tmp,
"%.2x %.2x %.2x %.2x",
innerData[0],
innerData[1],
innerData[2],
innerData[3]);
CSFLogError(LOGTAG,
"Error unprotecting RTP in %s len= %zu [%s]",
mDescription.c_str(),
aLen,
tmp);
return;
}
CSFLogDebug(LOGTAG, "%s received RTP packet.", mDescription.c_str());
IncrementRtpPacketsReceived(outLen);
IncrementRtpPacketsReceived(packet.len());
OnRtpPacketReceived();
RtpLogger::LogPacket(
innerData.get(), outLen, true, true, header.headerLength, mDescription);
RtpLogger::LogPacket(packet, true, header.headerLength, mDescription);
// Might be nice to pass ownership of the buffer in this case, but it is a
// small optimization in a rare case.
mPacketDumper->Dump(
mLevel, dom::mozPacketDumpType::Srtp, false, packet.encrypted_data(), packet.encrypted_len());
mPacketDumper->Dump(
mLevel, dom::mozPacketDumpType::Rtp, false, innerData.get(), outLen);
mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(), packet.len());
(void)mConduit->ReceivedRTPPacket(
innerData.get(), outLen, header.ssrc); // Ignore error codes
packet.data(), packet.len(), header.ssrc); // Ignore error codes
}
void
MediaPipeline::RtcpPacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen)
MediaPipeline::RtcpPacketReceived(TransportLayer* aLayer, MediaPacket& packet)
{
if (!mTransport->Pipeline()) {
CSFLogDebug(LOGTAG, "Discarding incoming packet; transport disconnected");
@ -1294,103 +1160,55 @@ MediaPipeline::RtcpPacketReceived(TransportLayer* aLayer,
return;
}
if (mRtcp.mTransport->state() != TransportLayer::TS_OPEN) {
if (mRtcp.mSrtp->state() != TransportLayer::TS_OPEN) {
CSFLogError(LOGTAG, "Discarding incoming packet; transport not open");
return;
}
if (!aLen) {
return;
}
// Filter out everything but RTP/RTCP
if (aData[0] < 128 || aData[0] > 191) {
if (!packet.len()) {
return;
}
// We do not filter receiver reports, since the webrtc.org code for
// senders already has logic to ignore RRs that do not apply.
// TODO bug 1279153: remove SR check for reduced size RTCP
if (mFilter && !mFilter->FilterSenderReport(aData, aLen)) {
if (mFilter && !mFilter->FilterSenderReport(packet.data(), packet.len())) {
CSFLogWarn(LOGTAG, "Dropping incoming RTCP packet; filtered out");
return;
}
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false, aData, aLen);
// Make a copy rather than cast away constness
auto innerData = MakeUnique<unsigned char[]>(aLen);
memcpy(innerData.get(), aData, aLen);
int outLen;
nsresult res =
mRtcp.mRecvSrtp->UnprotectRtcp(innerData.get(), aLen, aLen, &outLen);
if (!NS_SUCCEEDED(res))
return;
CSFLogDebug(LOGTAG, "%s received RTCP packet.", mDescription.c_str());
IncrementRtcpPacketsReceived();
RtpLogger::LogPacket(innerData.get(), outLen, true, false, 0, mDescription);
RtpLogger::LogPacket(packet, true, 0, mDescription);
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false, aData, aLen);
// Might be nice to pass ownership of the buffer in this case, but it is a
// small optimization in a rare case.
mPacketDumper->Dump(
mLevel, dom::mozPacketDumpType::Srtcp, false, packet.encrypted_data(), packet.encrypted_len());
MOZ_ASSERT(mRtcp.mRecvSrtp); // This should never happen
mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false, packet.data(), packet.len());
(void)mConduit->ReceivedRTCPPacket(innerData.get(),
outLen); // Ignore error codes
}
bool
MediaPipeline::IsRtp(const unsigned char* aData, size_t aLen) const
{
if (aLen < 2)
return false;
// Check if this is a RTCP packet. Logic based on the types listed in
// media/webrtc/trunk/src/modules/rtp_rtcp/source/rtp_utility.cc
// Anything outside this range is RTP.
if ((aData[1] < 192) || (aData[1] > 207))
return true;
if (aData[1] == 192) // FIR
return false;
if (aData[1] == 193) // NACK, but could also be RTP. This makes us sad
return true; // but it's how webrtc.org behaves.
if (aData[1] == 194)
return true;
if (aData[1] == 195) // IJ.
return false;
if ((aData[1] > 195) && (aData[1] < 200)) // the > 195 is redundant
return true;
if ((aData[1] >= 200) && (aData[1] <= 207)) // SR, RR, SDES, BYE,
return false; // APP, RTPFB, PSFB, XR
MOZ_ASSERT(false); // Not reached, belt and suspenders.
return true;
(void)mConduit->ReceivedRTCPPacket(packet.data(), packet.len()); // Ignore error codes
}
void
MediaPipeline::PacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen)
MediaPipeline::PacketReceived(TransportLayer* aLayer, MediaPacket& packet)
{
if (!mTransport->Pipeline()) {
CSFLogDebug(LOGTAG, "Discarding incoming packet; transport disconnected");
return;
}
if (IsRtp(aData, aLen)) {
RtpPacketReceived(aLayer, aData, aLen);
} else {
RtcpPacketReceived(aLayer, aData, aLen);
switch (packet.type()) {
case MediaPacket::RTP:
RtpPacketReceived(aLayer, packet);
break;
case MediaPacket::RTCP:
RtcpPacketReceived(aLayer, packet);
break;
default:
MOZ_CRASH("TransportLayerSrtp let something other than RTP/RTCP through");
}
}
@ -1519,7 +1337,6 @@ MediaPipelineTransmit::MediaPipelineTransmit(
nsCOMPtr<nsIEventTarget> aMainThread,
nsCOMPtr<nsIEventTarget> aStsThread,
bool aIsVideo,
dom::MediaStreamTrack* aDomTrack,
RefPtr<MediaSessionConduit> aConduit)
: MediaPipeline(aPc,
DirectionType::TRANSMIT,
@ -1534,10 +1351,8 @@ MediaPipelineTransmit::MediaPipelineTransmit(
// calls back to a VideoFrameFeeder
// that feeds I420 frames to
// VideoConduit.
, mDomTrack(aDomTrack)
, mTransmitting(false)
{
SetDescription();
if (!IsVideo()) {
mAudioProcessing = MakeAndAddRef<AudioProxyThread>(
static_cast<AudioSessionConduit*>(aConduit.get()));
@ -1719,7 +1534,7 @@ MediaPipelineTransmit::TransportReady_s(TransportInfo& aInfo)
}
nsresult
MediaPipelineTransmit::ReplaceTrack(RefPtr<MediaStreamTrack>& aDomTrack)
MediaPipelineTransmit::SetTrack(MediaStreamTrack* aDomTrack)
{
// MainThread, checked in calls we make
if (aDomTrack) {
@ -1750,10 +1565,11 @@ nsresult
MediaPipeline::ConnectTransport_s(TransportInfo& aInfo)
{
MOZ_ASSERT(aInfo.mTransport);
MOZ_ASSERT(aInfo.mSrtp);
ASSERT_ON_THREAD(mStsThread);
// Look to see if the transport is ready
if (aInfo.mTransport->state() == TransportLayer::TS_OPEN) {
if (aInfo.mSrtp->state() == TransportLayer::TS_OPEN) {
nsresult res = TransportReady_s(aInfo);
if (NS_FAILED(res)) {
CSFLogError(LOGTAG,
@ -1762,27 +1578,27 @@ MediaPipeline::ConnectTransport_s(TransportInfo& aInfo)
__FUNCTION__);
return res;
}
} else if (aInfo.mTransport->state() == TransportLayer::TS_ERROR) {
} else if (aInfo.mSrtp->state() == TransportLayer::TS_ERROR) {
CSFLogError(
LOGTAG, "%s transport is already in error state", ToString(aInfo.mType));
TransportFailed_s(aInfo);
return NS_ERROR_FAILURE;
}
aInfo.mTransport->SignalStateChange.connect(this, &MediaPipeline::StateChange);
aInfo.mSrtp->SignalStateChange.connect(this, &MediaPipeline::StateChange);
return NS_OK;
}
MediaPipeline::TransportInfo*
MediaPipeline::GetTransportInfo_s(TransportFlow* aFlow)
MediaPipeline::GetTransportInfo_s(TransportLayer* aLayer)
{
ASSERT_ON_THREAD(mStsThread);
if (aFlow == mRtp.mTransport) {
if (aLayer == mRtp.mSrtp) {
return &mRtp;
}
if (aFlow == mRtcp.mTransport) {
if (aLayer == mRtcp.mSrtp) {
return &mRtcp;
}
@ -1792,16 +1608,15 @@ MediaPipeline::GetTransportInfo_s(TransportFlow* aFlow)
nsresult
MediaPipeline::PipelineTransport::SendRtpPacket(const uint8_t* aData, size_t aLen)
{
nsAutoPtr<DataBuffer> buf(
new DataBuffer(aData, aLen, aLen + SRTP_MAX_EXPANSION));
nsAutoPtr<MediaPacket> packet(new MediaPacket);
packet->Copy(aData, aLen, aLen + SRTP_MAX_EXPANSION);
packet->SetType(MediaPacket::RTP);
RUN_ON_THREAD(
mStsThread,
WrapRunnable(RefPtr<MediaPipeline::PipelineTransport>(this),
&MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s,
buf,
true),
packet),
NS_DISPATCH_NORMAL);
return NS_OK;
@ -1809,108 +1624,75 @@ MediaPipeline::PipelineTransport::SendRtpPacket(const uint8_t* aData, size_t aLe
nsresult
MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s(
nsAutoPtr<DataBuffer> aData,
bool aIsRtp)
nsAutoPtr<MediaPacket> aPacket)
{
bool isRtp = aPacket->type() == MediaPacket::RTP;
ASSERT_ON_THREAD(mStsThread);
if (!mPipeline) {
return NS_OK; // Detached
}
TransportInfo& transport = aIsRtp ? mPipeline->mRtp : mPipeline->mRtcp;
if (!transport.mSendSrtp) {
CSFLogDebug(LOGTAG, "Couldn't write RTP/RTCP packet; SRTP not set up yet");
TransportInfo& transport = isRtp ? mPipeline->mRtp : mPipeline->mRtcp;
if (transport.mSrtp->state() != TransportLayer::TS_OPEN) {
// SRTP not ready yet.
return NS_OK;
}
MOZ_ASSERT(transport.mTransport);
NS_ENSURE_TRUE(transport.mTransport, NS_ERROR_NULL_POINTER);
// libsrtp enciphers in place, so we need a big enough buffer.
MOZ_ASSERT(aData->capacity() >= aData->len() + SRTP_MAX_EXPANSION);
MediaPacket packet(std::move(*aPacket));
packet.sdp_level() = Some(mPipeline->Level());
if (RtpLogger::IsPacketLoggingOn()) {
int headerLen = 12;
webrtc::RTPHeader header;
if (mPipeline->mRtpParser &&
mPipeline->mRtpParser->Parse(aData->data(), aData->len(), &header)) {
mPipeline->mRtpParser->Parse(packet.data(), packet.len(), &header)) {
headerLen = header.headerLength;
}
RtpLogger::LogPacket(aData->data(),
aData->len(),
false,
aIsRtp,
headerLen,
mPipeline->mDescription);
RtpLogger::LogPacket(packet, false, headerLen, mPipeline->mDescription);
}
int out_len;
nsresult res;
if (aIsRtp) {
if (isRtp) {
mPipeline->mPacketDumper->Dump(mPipeline->Level(),
dom::mozPacketDumpType::Rtp,
true,
aData->data(),
aData->len());
res = transport.mSendSrtp->ProtectRtp(
aData->data(), aData->len(), aData->capacity(), &out_len);
packet.data(),
packet.len());
mPipeline->IncrementRtpPacketsSent(packet.len());
} else {
mPipeline->mPacketDumper->Dump(mPipeline->Level(),
dom::mozPacketDumpType::Rtcp,
true,
aData->data(),
aData->len());
res = transport.mSendSrtp->ProtectRtcp(
aData->data(), aData->len(), aData->capacity(), &out_len);
packet.data(),
packet.len());
mPipeline->IncrementRtcpPacketsSent();
}
if (!NS_SUCCEEDED(res)) {
return res;
}
// paranoia; don't have uninitialized bytes included in data->len()
aData->SetLength(out_len);
CSFLogDebug(LOGTAG,
"%s sending %s packet",
mPipeline->mDescription.c_str(),
(aIsRtp ? "RTP" : "RTCP"));
if (aIsRtp) {
mPipeline->mPacketDumper->Dump(mPipeline->Level(),
dom::mozPacketDumpType::Srtp,
true,
aData->data(),
out_len);
(isRtp ? "RTP" : "RTCP"));
mPipeline->IncrementRtpPacketsSent(out_len);
} else {
mPipeline->mPacketDumper->Dump(mPipeline->Level(),
dom::mozPacketDumpType::Srtcp,
true,
aData->data(),
out_len);
mPipeline->IncrementRtcpPacketsSent();
}
return mPipeline->SendPacket(transport.mTransport, aData->data(), out_len);
return mPipeline->SendPacket(transport.mSrtp, packet);
}
nsresult
MediaPipeline::PipelineTransport::SendRtcpPacket(const uint8_t* aData,
size_t aLen)
{
nsAutoPtr<DataBuffer> buf(
new DataBuffer(aData, aLen, aLen + SRTP_MAX_EXPANSION));
nsAutoPtr<MediaPacket> packet(new MediaPacket);
packet->Copy(aData, aLen, aLen + SRTP_MAX_EXPANSION);
packet->SetType(MediaPacket::RTCP);
RUN_ON_THREAD(
mStsThread,
WrapRunnable(RefPtr<MediaPipeline::PipelineTransport>(this),
&MediaPipeline::PipelineTransport::SendRtpRtcpPacket_s,
buf,
false),
packet),
NS_DISPATCH_NORMAL);
return NS_OK;

Просмотреть файл

@ -16,7 +16,7 @@
#include "mozilla/ReentrantMonitor.h"
#include "mozilla/Atomics.h"
#include "SrtpFlow.h"
#include "databuffer.h"
#include "mediapacket.h"
#include "mtransport/runnable_utils.h"
#include "mtransport/transportflow.h"
#include "AudioPacketizer.h"
@ -185,9 +185,9 @@ public:
{
public:
// Implement the TransportInterface functions
explicit PipelineTransport(MediaPipeline* aPipeline)
: mPipeline(aPipeline)
, mStsThread(aPipeline->mStsThread)
explicit PipelineTransport(nsIEventTarget* aStsThread)
: mPipeline(nullptr)
, mStsThread(aStsThread)
{
}
@ -199,7 +199,7 @@ public:
virtual nsresult SendRtcpPacket(const uint8_t* aData, size_t aLen) override;
private:
nsresult SendRtpRtcpPacket_s(nsAutoPtr<DataBuffer> aData, bool aIsRtp);
nsresult SendRtpRtcpPacket_s(nsAutoPtr<MediaPacket> aData);
// Creates a cycle, which we break with Detach
RefPtr<MediaPipeline> mPipeline;
@ -215,6 +215,7 @@ protected:
{
TransportInfo(RefPtr<TransportFlow> aFlow, RtpType aType)
: mTransport(aFlow)
, mSrtp(mTransport ? mTransport->GetLayer("srtp") : nullptr)
, mState(StateType::MP_CONNECTING)
, mType(aType)
{
@ -223,14 +224,12 @@ protected:
void Detach()
{
mTransport = nullptr;
mSendSrtp = nullptr;
mRecvSrtp = nullptr;
mSrtp = nullptr;
}
RefPtr<TransportFlow> mTransport;
TransportLayer* mSrtp;
StateType mState;
RefPtr<SrtpFlow> mSendSrtp;
RefPtr<SrtpFlow> mRecvSrtp;
RtpType mType;
};
@ -242,7 +241,7 @@ protected:
nsresult ConnectTransport_s(TransportInfo& aInfo);
TransportInfo* GetTransportInfo_s(TransportFlow* aFlow);
TransportInfo* GetTransportInfo_s(TransportLayer* aLayer);
void IncrementRtpPacketsSent(int aBytes);
void IncrementRtcpPacketsSent();
@ -250,21 +249,14 @@ protected:
virtual void OnRtpPacketReceived() {};
void IncrementRtcpPacketsReceived();
virtual nsresult SendPacket(const TransportFlow* aFlow,
const void* aData,
int aLen);
virtual nsresult SendPacket(TransportLayer* aLayer,
MediaPacket& packet);
// Process slots on transports
void StateChange(TransportFlow* aFlow, TransportLayer::State);
void RtpPacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen);
void RtcpPacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen);
void PacketReceived(TransportLayer* aLayer,
const unsigned char* aData,
size_t aLen);
void StateChange(TransportLayer* aLayer, TransportLayer::State);
void RtpPacketReceived(TransportLayer* aLayer, MediaPacket& packet);
void RtcpPacketReceived(TransportLayer* aLayer, MediaPacket& packet);
void PacketReceived(TransportLayer* aLayer, MediaPacket& packet);
void SetDescription_s(const std::string& description);
@ -326,7 +318,6 @@ public:
nsCOMPtr<nsIEventTarget> aMainThread,
nsCOMPtr<nsIEventTarget> aStsThread,
bool aIsVideo,
dom::MediaStreamTrack* aDomTrack,
RefPtr<MediaSessionConduit> aConduit);
void Start() override;
@ -352,7 +343,7 @@ public:
// In non-compliance with the likely final spec, allow the new
// track to be part of a different stream (since we don't support
// multiple tracks of a type in a stream yet). bug 1056650
virtual nsresult ReplaceTrack(RefPtr<dom::MediaStreamTrack>& aDomTrack);
virtual nsresult SetTrack(dom::MediaStreamTrack* aDomTrack);
// Separate classes to allow ref counting
class PipelineListener;

Просмотреть файл

@ -33,8 +33,9 @@ bool RtpLogger::IsPacketLoggingOn() {
return CSFLogTestLevel(CSF_LOG_DEBUG);
}
void RtpLogger::LogPacket(const unsigned char *data, int len, bool input,
bool isRtp, int headerLength, std::string desc) {
void RtpLogger::LogPacket(const MediaPacket& packet, bool input,
size_t headerLength, std::string desc) {
bool isRtp = (packet.type() == MediaPacket::RTP);
if (CSFLogTestLevel(CSF_LOG_DEBUG)) {
std::stringstream ss;
/* This creates text2pcap compatible format, e.g.:
@ -59,17 +60,17 @@ void RtpLogger::LogPacket(const unsigned char *data, int len, bool input,
#endif
ss << " 000000";
ss << std::hex << std::setfill('0');
int offset_ = headerLength;
if (isRtp && (offset_ + 5 < len)) {
size_t offset_ = headerLength;
if (isRtp && (offset_ + 5 < packet.len())) {
// Allow the first 5 bytes of the payload in clear
offset_ += 5;
}
for (int i=0; i < len; ++i) {
for (size_t i=0; i < packet.len(); ++i) {
if (isRtp && i > offset_) {
ss << " 00";
}
else {
ss << " " << std::setw(2) << (int)data[i];
ss << " " << std::setw(2) << (int)packet.data()[i];
}
}
CSFLogDebug(LOGTAG, "%s%s%s", ss.str().c_str(),

Просмотреть файл

@ -8,6 +8,7 @@
#define rtplogger_h__
#include "webrtc/modules/rtp_rtcp/include/rtp_header_parser.h"
#include "mtransport/mediapacket.h"
namespace mozilla {
@ -19,8 +20,8 @@ namespace mozilla {
class RtpLogger {
public:
static bool IsPacketLoggingOn();
static void LogPacket(const unsigned char *data, int len, bool input,
bool isRtp, int headerLength, std::string desc);
static void LogPacket(const MediaPacket& packet, bool input,
size_t headerLength, std::string desc);
};
} // End of namespace

Просмотреть файл

@ -0,0 +1,77 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
// Original author: ekr@rtfm.com
#include "TransportLayerPacketDumper.h"
#include "logging.h"
#include "nsError.h"
#include "mozilla/Assertions.h"
namespace mozilla {
MOZ_MTLOG_MODULE("mtransport")
TransportLayerPacketDumper::TransportLayerPacketDumper(
nsAutoPtr<PacketDumper>&& aPacketDumper, dom::mozPacketDumpType aType) :
mPacketDumper(std::move(aPacketDumper)),
mType(aType)
{}
void
TransportLayerPacketDumper::WasInserted()
{
CheckThread();
if (!downward_) {
MOZ_MTLOG(ML_ERROR, "Packet dumper with nothing below. This is useless");
TL_SET_STATE(TS_ERROR);
}
downward_->SignalStateChange.connect(this,
&TransportLayerPacketDumper::StateChange);
downward_->SignalPacketReceived.connect(this,
&TransportLayerPacketDumper::PacketReceived);
}
TransportResult
TransportLayerPacketDumper::SendPacket(MediaPacket& packet)
{
if (packet.sdp_level().isSome()) {
dom::mozPacketDumpType dumpType = mType;
if (mType == dom::mozPacketDumpType::Srtp &&
packet.type() == MediaPacket::RTCP) {
dumpType = dom::mozPacketDumpType::Srtcp;
}
mPacketDumper->Dump(*packet.sdp_level(),
dumpType,
true,
packet.data(),
packet.len());
}
return downward_->SendPacket(packet);
}
void
TransportLayerPacketDumper::StateChange(TransportLayer* aLayer, State aState)
{
TL_SET_STATE(aState);
}
void
TransportLayerPacketDumper::PacketReceived(TransportLayer* aLayer,
MediaPacket& packet)
{
// There's no way to know the level yet, so we can't use the packet dumper
// yet. We rely on the SRTP layer saving the encrypted packet in
// MediaPacket::encrypted_, to allow MediaPipeline to dump it later.
SignalPacketReceived(this, packet);
}
} // namespace mozilla

Просмотреть файл

@ -0,0 +1,40 @@
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
/* vim: set ts=2 et sw=2 tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#ifndef transportlayerpacketdumper_h__
#define transportlayerpacketdumper_h__
#include "transportlayer.h"
#include "signaling/src/peerconnection/PacketDumper.h"
#include "mozilla/dom/RTCPeerConnectionBinding.h"
namespace mozilla {
class TransportLayerPacketDumper final : public TransportLayer {
public:
explicit TransportLayerPacketDumper(nsAutoPtr<PacketDumper>&& aPacketDumper,
dom::mozPacketDumpType aType);
virtual ~TransportLayerPacketDumper() {};
// Transport layer overrides.
void WasInserted() override;
TransportResult SendPacket(MediaPacket& packet) override;
// Signals
void StateChange(TransportLayer *aLayer, State state);
void PacketReceived(TransportLayer* aLayer, MediaPacket& packet);
TRANSPORT_LAYER_ID("packet-dumper")
private:
DISALLOW_COPY_ASSIGN(TransportLayerPacketDumper);
nsAutoPtr<PacketDumper> mPacketDumper;
dom::mozPacketDumpType mType;
};
} // close namespace
#endif

Просмотреть файл

@ -23,7 +23,7 @@ UNIFIED_SOURCES += [
'MediaPipeline.cpp',
'MediaPipelineFilter.cpp',
'RtpLogger.cpp',
'SrtpFlow.cpp',
'TransportLayerPacketDumper.cpp',
]
FINAL_LIBRARY = 'xul'

Просмотреть файл

@ -19,8 +19,11 @@
#include "runnable_utils.h"
#include "transportlayerice.h"
#include "transportlayerdtls.h"
#include "transportlayersrtp.h"
#include "signaling/src/jsep/JsepSession.h"
#include "signaling/src/jsep/JsepTransport.h"
#include "signaling/src/mediapipeline/TransportLayerPacketDumper.h"
#include "signaling/src/peerconnection/PacketDumper.h"
#include "nsContentUtils.h"
#include "nsNetCID.h"
@ -527,21 +530,30 @@ PeerConnectionMedia::UpdateTransportFlows(const JsepTransceiver& aTransceiver)
// the ICE data is destroyed on the STS.
static void
FinalizeTransportFlow_s(RefPtr<PeerConnectionMedia> aPCMedia,
nsAutoPtr<PacketDumper> aPacketDumper,
RefPtr<TransportFlow> aFlow, size_t aLevel,
bool aIsRtcp,
nsAutoPtr<PtrVector<TransportLayer> > aLayerList)
TransportLayerIce* aIceLayer,
TransportLayerDtls* aDtlsLayer,
TransportLayerSrtp* aSrtpLayer)
{
TransportLayerIce* ice =
static_cast<TransportLayerIce*>(aLayerList->values.front());
ice->SetParameters(aPCMedia->ice_media_stream(aLevel),
aIsRtcp ? 2 : 1);
nsAutoPtr<std::queue<TransportLayer*> > layerQueue(
new std::queue<TransportLayer*>);
for (auto& value : aLayerList->values) {
layerQueue->push(value);
}
aLayerList->values.clear();
(void)aFlow->PushLayers(layerQueue); // TODO(bug 854518): Process errors.
TransportLayerPacketDumper* srtpDumper(new TransportLayerPacketDumper(
std::move(aPacketDumper), dom::mozPacketDumpType::Srtp));
aIceLayer->SetParameters(aPCMedia->ice_media_stream(aLevel),
aIsRtcp ? 2 : 1);
// TODO(bug 854518): Process errors.
(void)aIceLayer->Init();
(void)aDtlsLayer->Init();
(void)srtpDumper->Init();
(void)aSrtpLayer->Init();
aDtlsLayer->Chain(aIceLayer);
srtpDumper->Chain(aIceLayer);
aSrtpLayer->Chain(srtpDumper);
aFlow->PushLayer(aIceLayer);
aFlow->PushLayer(aDtlsLayer);
aFlow->PushLayer(srtpDumper);
aFlow->PushLayer(aSrtpLayer);
}
static void
@ -601,6 +613,7 @@ PeerConnectionMedia::UpdateTransportFlow(
// The media streams are made on STS so we need to defer setup.
auto ice = MakeUnique<TransportLayerIce>();
auto dtls = MakeUnique<TransportLayerDtls>();
auto srtp = MakeUnique<TransportLayerSrtp>(*dtls);
dtls->SetRole(aTransport.mDtls->GetRole() ==
JsepDtlsTransport::kJsepDtlsClient
? TransportLayerDtls::CLIENT
@ -652,14 +665,13 @@ PeerConnectionMedia::UpdateTransportFlow(
return rv;
}
nsAutoPtr<PtrVector<TransportLayer> > layers(new PtrVector<TransportLayer>);
layers->values.push_back(ice.release());
layers->values.push_back(dtls.release());
nsAutoPtr<PacketDumper> packetDumper(new PacketDumper(mParent));
RefPtr<PeerConnectionMedia> pcMedia(this);
rv = GetSTSThread()->Dispatch(
WrapRunnableNM(FinalizeTransportFlow_s, pcMedia, flow, aLevel, aIsRtcp,
layers),
WrapRunnableNM(FinalizeTransportFlow_s, pcMedia, packetDumper, flow,
aLevel, aIsRtcp,
ice.release(), dtls.release(), srtp.release()),
NS_DISPATCH_NORMAL);
if (NS_FAILED(rv)) {
CSFLogError(LOGTAG, "Failed to dispatch FinalizeTransportFlow_s");

Просмотреть файл

@ -69,8 +69,9 @@ TransceiverImpl::TransceiverImpl(
mMainThread.get(),
mStsThread.get(),
IsVideo(),
mSendTrack,
mConduit);
mTransmitPipeline->SetTrack(mSendTrack);
}
TransceiverImpl::~TransceiverImpl() = default;
@ -158,7 +159,7 @@ TransceiverImpl::UpdateSendTrack(dom::MediaStreamTrack* aSendTrack)
MOZ_MTLOG(ML_DEBUG, mPCHandle << "[" << mMid << "]: " << __FUNCTION__ <<
"(" << aSendTrack << ")");
mSendTrack = aSendTrack;
return mTransmitPipeline->ReplaceTrack(mSendTrack);
return mTransmitPipeline->SetTrack(mSendTrack);
}
nsresult

Просмотреть файл

@ -313,6 +313,7 @@ DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
mSocket = nullptr;
mMasterSocket = nullptr;
mListener = listener;
mDtls = nullptr;
mLocalPort = 0;
mRemotePort = 0;
mPendingType = PENDING_NONE;
@ -330,7 +331,7 @@ DataChannelConnection::~DataChannelConnection()
ASSERT_WEBRTC(mState == CLOSED);
MOZ_ASSERT(!mMasterSocket);
MOZ_ASSERT(mPending.GetSize() == 0);
MOZ_ASSERT(!mTransportFlow);
MOZ_ASSERT(!mDtls);
// Already disconnected from sigslot/mTransportFlow
// TransportFlows must be released from the STS thread
@ -415,6 +416,7 @@ void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
void DataChannelConnection::DestroyOnSTSFinal()
{
mTransportFlow = nullptr;
mDtls = nullptr;
sDataChannelShutdown->CreateConnectionShutdown(this);
}
@ -670,10 +672,8 @@ DataChannelConnection::SetEvenOdd()
{
ASSERT_WEBRTC(IsSTSThread());
TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
mTransportFlow->GetLayer(TransportLayerDtls::ID()));
MOZ_ASSERT(dtls); // DTLS is mandatory
mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
MOZ_ASSERT(mDtls); // DTLS is mandatory
mAllocateEven = (mDtls->role() == TransportLayerDtls::CLIENT);
}
bool
@ -701,16 +701,17 @@ void
DataChannelConnection::SetSignals()
{
ASSERT_WEBRTC(IsSTSThread());
ASSERT_WEBRTC(mTransportFlow);
LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
mDtls = static_cast<TransportLayerDtls*>(mTransportFlow->GetLayer("dtls"));
ASSERT_WEBRTC(mDtls);
LOG(("Setting transport signals, state: %d", mDtls->state()));
mDtls->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
// SignalStateChange() doesn't call you with the initial state
mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
CompleteConnect(mTransportFlow, mTransportFlow->state());
mDtls->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
CompleteConnect(mDtls, mDtls->state());
}
void
DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
DataChannelConnection::CompleteConnect(TransportLayer *layer, TransportLayer::State state)
{
LOG(("Data transport state: %d", state));
MutexAutoLock lock(mLock);
@ -816,34 +817,33 @@ DataChannelConnection::ProcessQueuedOpens()
}
}
void
DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
const unsigned char *data, size_t len)
DataChannelConnection::SctpDtlsInput(TransportLayer *layer, MediaPacket& packet)
{
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
char *buf;
if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
if ((buf = usrsctp_dumppacket((void *)packet.data(),
packet.len(),
SCTP_DUMP_INBOUND)) != nullptr) {
SCTP_LOG(("%s", buf));
usrsctp_freedumpbuffer(buf);
}
}
// Pass the data to SCTP
MutexAutoLock lock(mLock);
usrsctp_conninput(static_cast<void *>(this), data, len, 0);
usrsctp_conninput(static_cast<void *>(this), packet.data(), packet.len(), 0);
}
int
DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
DataChannelConnection::SendPacket(nsAutoPtr<MediaPacket> packet)
{
//LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
int res = 0;
if (mTransportFlow) {
res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
if (mDtls) {
return mDtls->SendPacket(*packet) < 0 ? 1 : 0;
}
if (release)
delete [] data;
return res;
return 0;
}
/* static */
@ -852,7 +852,6 @@ DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
uint8_t tos, uint8_t set_df)
{
DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
int res;
MOZ_DIAGNOSTIC_ASSERT(!peer->mShutdown);
if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
@ -863,30 +862,24 @@ DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
usrsctp_freedumpbuffer(buf);
}
}
// We're async proxying even if on the STSThread because this is called
// with internal SCTP locks held in some cases (such as in usrsctp_connect()).
// SCTP has an option for Apple, on IP connections only, to release at least
// one of the locks before calling a packet output routine; with changes to
// the underlying SCTP stack this might remove the need to use an async proxy.
if ((false /*peer->IsSTSThread()*/)) {
res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
} else {
auto *data = new unsigned char[length];
memcpy(data, buffer, length);
// Commented out since we have to Dispatch SendPacket to avoid deadlock"
// res = -1;
nsAutoPtr<MediaPacket> packet(new MediaPacket);
packet->Copy(static_cast<const uint8_t*>(buffer), length);
// XXX It might be worthwhile to add an assertion against the thread
// somehow getting into the DataChannel/SCTP code again, as
// DISPATCH_SYNC is not fully blocking. This may be tricky, as it
// needs to be a per-thread check, not a global.
peer->mSTS->Dispatch(WrapRunnable(
RefPtr<DataChannelConnection>(peer),
&DataChannelConnection::SendPacket, data, length, true),
NS_DISPATCH_NORMAL);
res = 0; // cheat! Packets can always be dropped later anyways
}
return res;
// XXX It might be worthwhile to add an assertion against the thread
// somehow getting into the DataChannel/SCTP code again, as
// DISPATCH_SYNC is not fully blocking. This may be tricky, as it
// needs to be a per-thread check, not a global.
peer->mSTS->Dispatch(WrapRunnable(
RefPtr<DataChannelConnection>(peer),
&DataChannelConnection::SendPacket, packet),
NS_DISPATCH_NORMAL);
return 0; // cheat! Packets can always be dropped later anyways
}
#endif

Просмотреть файл

@ -166,7 +166,7 @@ public:
// Connect using a TransportFlow (DTLS) channel
void SetEvenOdd();
bool ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport);
void CompleteConnect(TransportFlow *flow, TransportLayer::State state);
void CompleteConnect(TransportLayer *layer, TransportLayer::State state);
void SetSignals();
#endif
@ -241,8 +241,8 @@ private:
#ifdef SCTP_DTLS_SUPPORTED
static void DTLSConnectThread(void *data);
int SendPacket(unsigned char data[], size_t len, bool release);
void SctpDtlsInput(TransportFlow *flow, const unsigned char *data, size_t len);
int SendPacket(nsAutoPtr<MediaPacket> packet);
void SctpDtlsInput(TransportLayer *layer, MediaPacket& packet);
static int SctpDtlsOutput(void *addr, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
#endif
DataChannel* FindChannelByStream(uint16_t stream);
@ -334,6 +334,7 @@ private:
#ifdef SCTP_DTLS_SUPPORTED
RefPtr<TransportFlow> mTransportFlow;
TransportLayerDtls* mDtls;
nsCOMPtr<nsIEventTarget> mSTS;
#endif
uint16_t mLocalPort; // Accessed from connect thread

Просмотреть файл

@ -36,6 +36,7 @@ use std::slice;
/// (from left to right). Once the process is complete, callers should invoke
/// build(), which transforms the contents of the SelectorBuilder into a heap-
/// allocated Selector and leaves the builder in a drained state.
#[derive(Debug)]
pub struct SelectorBuilder<Impl: SelectorImpl> {
/// The entire sequence of simple selectors, from left to right, without combinators.
///
@ -104,7 +105,7 @@ impl<Impl: SelectorImpl> SelectorBuilder<Impl> {
parsed_slotted: bool,
) -> ThinArc<SpecificityAndFlags, Component<Impl>> {
// Compute the specificity and flags.
let mut spec = SpecificityAndFlags(specificity(self.simple_selectors.iter()));
let mut spec = SpecificityAndFlags(specificity(&*self, self.simple_selectors.iter()));
if parsed_pseudo {
spec.0 |= HAS_PSEUDO_BIT;
}
@ -268,25 +269,35 @@ impl From<Specificity> for u32 {
}
}
fn specificity<Impl>(iter: slice::Iter<Component<Impl>>) -> u32
fn specificity<Impl>(builder: &SelectorBuilder<Impl>, iter: slice::Iter<Component<Impl>>) -> u32
where
Impl: SelectorImpl,
{
complex_selector_specificity(iter).into()
complex_selector_specificity(builder, iter).into()
}
fn complex_selector_specificity<Impl>(mut iter: slice::Iter<Component<Impl>>) -> Specificity
fn complex_selector_specificity<Impl>(
builder: &SelectorBuilder<Impl>,
mut iter: slice::Iter<Component<Impl>>,
) -> Specificity
where
Impl: SelectorImpl,
{
fn simple_selector_specificity<Impl>(
builder: &SelectorBuilder<Impl>,
simple_selector: &Component<Impl>,
specificity: &mut Specificity,
) where
Impl: SelectorImpl,
{
match *simple_selector {
Component::Combinator(..) => unreachable!(),
Component::Combinator(ref combinator) => {
unreachable!(
"Found combinator {:?} in simple selectors vector? {:?}",
combinator,
builder,
);
}
// FIXME(emilio): Spec doesn't define any particular specificity for
// ::slotted(), so apply the general rule for pseudos per:
//
@ -326,7 +337,7 @@ where
},
Component::Negation(ref negated) => {
for ss in negated.iter() {
simple_selector_specificity(&ss, specificity);
simple_selector_specificity(builder, &ss, specificity);
}
},
}
@ -334,7 +345,7 @@ where
let mut specificity = Default::default();
for simple_selector in &mut iter {
simple_selector_specificity(&simple_selector, &mut specificity);
simple_selector_specificity(builder, &simple_selector, &mut specificity);
}
specificity
}

Просмотреть файл

@ -92,7 +92,7 @@ macro_rules! with_all_bounds {
/// NB: We need Clone so that we can derive(Clone) on struct with that
/// are parameterized on SelectorImpl. See
/// <https://github.com/rust-lang/rust/issues/26925>
pub trait SelectorImpl: Clone + Sized + 'static {
pub trait SelectorImpl: Clone + Debug + Sized + 'static {
type ExtraMatchingData: Sized + Default + 'static;
type AttrValue: $($InSelector)*;
type Identifier: $($InSelector)*;