Bug 950660: Part 4: Bridge TCPSocketChild to nr_socket r=bwc,jdm

Improve use of TCPSocket to track in-flight writes and suppress extra runnables
Adds lots of logging to nr_socket_buffered_stun.c
Rework mtransport code to use new TCPSocketChild interface
This commit is contained in:
"Chih-Kai (Patrick) Wang" 2015-01-05 15:49:50 +08:00
Родитель 73b9d568ed
Коммит 028e16c84d
8 изменённых файлов: 753 добавлений и 116 удалений

Просмотреть файл

@ -4,6 +4,8 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
include("/ipc/chromium/chromium-config.mozbuild")
EXPORTS.mtransport += [
'../dtlsidentity.h',
'../m_cpp_utils.h',

Просмотреть файл

@ -4,6 +4,8 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
include("/ipc/chromium/chromium-config.mozbuild")
DIRS += [
'/media/mtransport/third_party',
'/media/mtransport/build',

Просмотреть файл

@ -106,6 +106,54 @@ nrappkit copyright:
#include "nsXPCOM.h"
#include "nsXULAppAPI.h"
#include "runnable_utils.h"
#include "mozilla/SyncRunnable.h"
#include "nsTArray.h"
#include "mozilla/dom/TCPSocketBinding.h"
#include "nsITCPSocketCallback.h"
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
// csi_platform.h deep in nrappkit defines LOG_INFO and LOG_WARNING
#ifdef LOG_INFO
#define LOG_TEMP_INFO LOG_INFO
#undef LOG_INFO
#endif
#ifdef LOG_WARNING
#define LOG_TEMP_WARNING LOG_WARNING
#undef LOG_WARNING
#endif
#if defined(LOG_DEBUG)
#define LOG_TEMP_DEBUG LOG_DEBUG
#undef LOG_DEBUG
#endif
#undef strlcpy
// TCPSocketChild.h doesn't include TypedArray.h
namespace mozilla {
namespace dom {
class ArrayBuffer;
}
}
#include "mozilla/dom/network/TCPSocketChild.h"
#ifdef LOG_TEMP_INFO
#define LOG_INFO LOG_TEMP_INFO
#endif
#ifdef LOG_TEMP_WARNING
#define LOG_WARNING LOG_TEMP_WARNING
#endif
#ifdef LOG_TEMP_DEBUG
#define LOG_DEBUG LOG_TEMP_DEBUG
#endif
#ifdef XP_WIN
#ifdef LOG_DEBUG
#undef LOG_DEBUG
#endif
// cloned from csi_platform.h. Win32 doesn't like how we hide symbols
#define LOG_DEBUG 7
#endif
#endif
extern "C" {
#include "nr_api.h"
@ -203,6 +251,31 @@ static void ClearSingletonOnShutdown()
}
#endif
static nsIThread* GetIOThreadAndAddUse_s()
{
// Always runs on STS thread!
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
// We need to safely release this on shutdown to avoid leaks
if (!sThread) {
sThread = new SingletonThreadHolder(NS_LITERAL_CSTRING("mtransport"));
NS_DispatchToMainThread(mozilla::WrapRunnableNM(&ClearSingletonOnShutdown));
}
// Mark that we're using the shared thread and need it to stick around
sThread->AddUse();
return sThread->GetThread();
#else
static nsCOMPtr<nsIThread> sThread;
if (!sThread) {
(void) NS_NewNamedThread("mtransport", getter_AddRefs(sThread));
}
return sThread;
#endif
}
NrSocketIpc::NrSocketIpc(nsIEventTarget *aThread)
: io_thread_(aThread)
{}
static TimeStamp nr_socket_short_term_violation_time;
static TimeStamp nr_socket_long_term_violation_time;
@ -924,10 +997,10 @@ abort:
return(_status);
}
NS_IMPL_ISUPPORTS(NrSocketIpcProxy, nsIUDPSocketInternal)
NS_IMPL_ISUPPORTS(NrUdpSocketIpcProxy, nsIUDPSocketInternal)
nsresult
NrSocketIpcProxy::Init(const nsRefPtr<NrSocketIpc>& socket)
NrUdpSocketIpcProxy::Init(const nsRefPtr<NrUdpSocketIpc>& socket)
{
nsresult rv;
sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
@ -940,7 +1013,7 @@ NrSocketIpcProxy::Init(const nsRefPtr<NrSocketIpc>& socket)
return NS_OK;
}
NrSocketIpcProxy::~NrSocketIpcProxy()
NrUdpSocketIpcProxy::~NrUdpSocketIpcProxy()
{
// Send our ref to STS to be released
RUN_ON_THREAD(sts_thread_,
@ -950,39 +1023,39 @@ NrSocketIpcProxy::~NrSocketIpcProxy()
// IUDPSocketInternal interfaces
// callback while error happened in UDP socket operation
NS_IMETHODIMP NrSocketIpcProxy::CallListenerError(const nsACString &message,
const nsACString &filename,
uint32_t line_number) {
NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerError(const nsACString &message,
const nsACString &filename,
uint32_t line_number) {
return socket_->CallListenerError(message, filename, line_number);
}
// callback while receiving UDP packet
NS_IMETHODIMP NrSocketIpcProxy::CallListenerReceivedData(const nsACString &host,
uint16_t port,
const uint8_t *data,
uint32_t data_length) {
NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerReceivedData(const nsACString &host,
uint16_t port,
const uint8_t *data,
uint32_t data_length) {
return socket_->CallListenerReceivedData(host, port, data, data_length);
}
// callback while UDP socket is opened
NS_IMETHODIMP NrSocketIpcProxy::CallListenerOpened() {
NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerOpened() {
return socket_->CallListenerOpened();
}
// callback while UDP socket is closed
NS_IMETHODIMP NrSocketIpcProxy::CallListenerClosed() {
NS_IMETHODIMP NrUdpSocketIpcProxy::CallListenerClosed() {
return socket_->CallListenerClosed();
}
// NrSocketIpc Implementation
NrSocketIpc::NrSocketIpc()
: err_(false),
state_(NR_INIT),
io_thread_(GetIOThreadAndAddUse_s()),
monitor_("NrSocketIpc") {
// NrUdpSocketIpc Implementation
NrUdpSocketIpc::NrUdpSocketIpc()
: NrSocketIpc(GetIOThreadAndAddUse_s()),
monitor_("NrUdpSocketIpc"),
err_(false),
state_(NR_INIT) {
}
NrSocketIpc::~NrSocketIpc()
NrUdpSocketIpc::~NrUdpSocketIpc()
{
// also guarantees socket_child_ is released from the io_thread, and
// tells the SingletonThreadHolder we're done with it
@ -990,40 +1063,18 @@ NrSocketIpc::~NrSocketIpc()
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
// close(), but transfer the socket_child_ reference to die as well
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnableNM(&NrSocketIpc::release_child_i,
mozilla::WrapRunnableNM(&NrUdpSocketIpc::release_child_i,
socket_child_.forget().take(),
sts_thread_),
NS_DISPATCH_NORMAL);
#endif
}
/* static */
nsIThread* NrSocketIpc::GetIOThreadAndAddUse_s()
{
// Always runs on STS thread!
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
// We need to safely release this on shutdown to avoid leaks
if (!sThread) {
sThread = new SingletonThreadHolder(NS_LITERAL_CSTRING("mtransport"));
NS_DispatchToMainThread(mozilla::WrapRunnableNM(&ClearSingletonOnShutdown));
}
// Mark that we're using the shared thread and need it to stick around
sThread->AddUse();
return sThread->GetThread();
#else
static nsCOMPtr<nsIThread> sThread;
if (!sThread) {
(void) NS_NewNamedThread("mtransport", getter_AddRefs(sThread));
}
return sThread;
#endif
}
// IUDPSocketInternal interfaces
// callback while error happened in UDP socket operation
NS_IMETHODIMP NrSocketIpc::CallListenerError(const nsACString &message,
const nsACString &filename,
uint32_t line_number) {
NS_IMETHODIMP NrUdpSocketIpc::CallListenerError(const nsACString &message,
const nsACString &filename,
uint32_t line_number) {
ASSERT_ON_THREAD(io_thread_);
r_log(LOG_GENERIC, LOG_ERR, "UDP socket error:%s at %s:%d",
@ -1037,10 +1088,10 @@ NS_IMETHODIMP NrSocketIpc::CallListenerError(const nsACString &message,
}
// callback while receiving UDP packet
NS_IMETHODIMP NrSocketIpc::CallListenerReceivedData(const nsACString &host,
uint16_t port,
const uint8_t *data,
uint32_t data_length) {
NS_IMETHODIMP NrUdpSocketIpc::CallListenerReceivedData(const nsACString &host,
uint16_t port,
const uint8_t *data,
uint32_t data_length) {
ASSERT_ON_THREAD(io_thread_);
PRNetAddr addr;
@ -1067,15 +1118,15 @@ NS_IMETHODIMP NrSocketIpc::CallListenerReceivedData(const nsACString &host,
RefPtr<nr_udp_message> msg(new nr_udp_message(addr, buf));
RUN_ON_THREAD(sts_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::recv_callback_s,
mozilla::WrapRunnable(nsRefPtr<NrUdpSocketIpc>(this),
&NrUdpSocketIpc::recv_callback_s,
msg),
NS_DISPATCH_NORMAL);
return NS_OK;
}
// callback while UDP socket is opened
NS_IMETHODIMP NrSocketIpc::CallListenerOpened() {
NS_IMETHODIMP NrUdpSocketIpc::CallListenerOpened() {
ASSERT_ON_THREAD(io_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
@ -1129,7 +1180,7 @@ NS_IMETHODIMP NrSocketIpc::CallListenerOpened() {
}
// callback while UDP socket is closed
NS_IMETHODIMP NrSocketIpc::CallListenerClosed() {
NS_IMETHODIMP NrUdpSocketIpc::CallListenerClosed() {
ASSERT_ON_THREAD(io_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
@ -1140,8 +1191,10 @@ NS_IMETHODIMP NrSocketIpc::CallListenerClosed() {
return NS_OK;
}
// nr_socket public APIs
int NrSocketIpc::create(nr_transport_addr *addr) {
//
// NrSocketBase methods.
//
int NrUdpSocketIpc::create(nr_transport_addr *addr) {
ASSERT_ON_THREAD(sts_thread_);
int r, _status;
@ -1155,12 +1208,6 @@ int NrSocketIpc::create(nr_transport_addr *addr) {
ABORT(R_INTERNAL);
}
// Bug 950660: Remote TCP socket is not supported yet.
if (NS_WARN_IF(addr->protocol != IPPROTO_UDP)) {
MOZ_ASSERT(false, "NrSocket over TCP is not e10s ready, see Bug 950660");
ABORT(R_INTERNAL);
}
sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_FAILED(rv)) {
MOZ_ASSERT(false, "Failed to get STS thread");
@ -1171,7 +1218,7 @@ int NrSocketIpc::create(nr_transport_addr *addr) {
ABORT(r);
}
// wildcard address will be resolved at NrSocketIpc::CallListenerVoid
// wildcard address will be resolved at NrUdpSocketIpc::CallListenerVoid
if ((r=nr_transport_addr_copy(&my_addr_, addr))) {
ABORT(r);
}
@ -1179,8 +1226,8 @@ int NrSocketIpc::create(nr_transport_addr *addr) {
state_ = NR_CONNECTING;
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::create_i,
mozilla::WrapRunnable(nsRefPtr<NrUdpSocketIpc>(this),
&NrUdpSocketIpc::create_i,
host, static_cast<uint16_t>(port)),
NS_DISPATCH_NORMAL);
@ -1198,7 +1245,7 @@ abort:
return(_status);
}
int NrSocketIpc::sendto(const void *msg, size_t len, int flags,
int NrUdpSocketIpc::sendto(const void *msg, size_t len, int flags,
nr_transport_addr *to) {
ASSERT_ON_THREAD(sts_thread_);
@ -1222,22 +1269,22 @@ int NrSocketIpc::sendto(const void *msg, size_t len, int flags,
nsAutoPtr<DataBuffer> buf(new DataBuffer(static_cast<const uint8_t*>(msg), len));
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::sendto_i,
mozilla::WrapRunnable(nsRefPtr<NrUdpSocketIpc>(this),
&NrUdpSocketIpc::sendto_i,
addr, buf),
NS_DISPATCH_NORMAL);
return 0;
}
void NrSocketIpc::close() {
void NrUdpSocketIpc::close() {
ASSERT_ON_THREAD(sts_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
state_ = NR_CLOSING;
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(nsRefPtr<NrSocketIpc>(this),
&NrSocketIpc::close_i),
mozilla::WrapRunnable(nsRefPtr<NrUdpSocketIpc>(this),
&NrUdpSocketIpc::close_i),
NS_DISPATCH_NORMAL);
//remove all enqueued messages
@ -1245,7 +1292,7 @@ void NrSocketIpc::close() {
std::swap(received_msgs_, empty);
}
int NrSocketIpc::recvfrom(void *buf, size_t maxlen, size_t *len, int flags,
int NrUdpSocketIpc::recvfrom(void *buf, size_t maxlen, size_t *len, int flags,
nr_transport_addr *from) {
ASSERT_ON_THREAD(sts_thread_);
@ -1289,7 +1336,7 @@ abort:
return(_status);
}
int NrSocketIpc::getaddr(nr_transport_addr *addrp) {
int NrUdpSocketIpc::getaddr(nr_transport_addr *addrp) {
ASSERT_ON_THREAD(sts_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
@ -1301,38 +1348,39 @@ int NrSocketIpc::getaddr(nr_transport_addr *addrp) {
return nr_transport_addr_copy(addrp, &my_addr_);
}
int NrSocketIpc::connect(nr_transport_addr *addr) {
int NrUdpSocketIpc::connect(nr_transport_addr *addr) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrSocketIpc::write(const void *msg, size_t len, size_t *written) {
int NrUdpSocketIpc::write(const void *msg, size_t len, size_t *written) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrSocketIpc::read(void* buf, size_t maxlen, size_t *len) {
int NrUdpSocketIpc::read(void* buf, size_t maxlen, size_t *len) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrSocketIpc::listen(int backlog) {
int NrUdpSocketIpc::listen(int backlog) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) {
int NrUdpSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
// IO thread executors
void NrSocketIpc::create_i(const nsACString &host, const uint16_t port) {
void NrUdpSocketIpc::create_i(const nsACString &host, const uint16_t port) {
ASSERT_ON_THREAD(io_thread_);
nsresult rv;
nsCOMPtr<nsIUDPSocketChild> socketChild = do_CreateInstance("@mozilla.org/udp-socket-child;1", &rv);
if (NS_FAILED(rv)) {
ReentrantMonitorAutoEnter mon(monitor_);
err_ = true;
MOZ_ASSERT(false, "Failed to create UDPSocketChild");
return;
@ -1349,7 +1397,7 @@ void NrSocketIpc::create_i(const nsACString &host, const uint16_t port) {
socketChild = nullptr;
}
nsRefPtr<NrSocketIpcProxy> proxy(new NrSocketIpcProxy);
nsRefPtr<NrUdpSocketIpcProxy> proxy(new NrUdpSocketIpcProxy);
rv = proxy->Init(this);
if (NS_FAILED(rv)) {
err_ = true;
@ -1368,17 +1416,16 @@ void NrSocketIpc::create_i(const nsACString &host, const uint16_t port) {
}
}
void NrSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf) {
void NrUdpSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf) {
ASSERT_ON_THREAD(io_thread_);
ReentrantMonitorAutoEnter mon(monitor_);
if (!socket_child_) {
MOZ_ASSERT(false);
err_ = true;
return;
}
ReentrantMonitorAutoEnter mon(monitor_);
if (NS_FAILED(socket_child_->SendWithAddress(&addr,
buf->data(),
buf->len()))) {
@ -1386,7 +1433,7 @@ void NrSocketIpc::sendto_i(const net::NetAddr &addr, nsAutoPtr<DataBuffer> buf)
}
}
void NrSocketIpc::close_i() {
void NrUdpSocketIpc::close_i() {
ASSERT_ON_THREAD(io_thread_);
if (socket_child_) {
@ -1398,8 +1445,8 @@ void NrSocketIpc::close_i() {
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
// close(), but transfer the socket_child_ reference to die as well
// static
void NrSocketIpc::release_child_i(nsIUDPSocketChild* aChild,
nsCOMPtr<nsIEventTarget> sts_thread) {
void NrUdpSocketIpc::release_child_i(nsIUDPSocketChild* aChild,
nsCOMPtr<nsIEventTarget> sts_thread) {
nsRefPtr<nsIUDPSocketChild> socket_child_ref =
already_AddRefed<nsIUDPSocketChild>(aChild);
if (socket_child_ref) {
@ -1407,16 +1454,16 @@ void NrSocketIpc::release_child_i(nsIUDPSocketChild* aChild,
}
// Tell SingletonThreadHolder we're done with it
RUN_ON_THREAD(sts_thread,
mozilla::WrapRunnableNM(&NrSocketIpc::release_use_s),
mozilla::WrapRunnableNM(&NrUdpSocketIpc::release_use_s),
NS_DISPATCH_NORMAL);
}
void NrSocketIpc::release_use_s() {
void NrUdpSocketIpc::release_use_s() {
sThread->ReleaseUse();
}
#endif
void NrSocketIpc::recv_callback_s(RefPtr<nr_udp_message> msg) {
void NrUdpSocketIpc::recv_callback_s(RefPtr<nr_udp_message> msg) {
ASSERT_ON_THREAD(sts_thread_);
{
@ -1434,6 +1481,448 @@ void NrSocketIpc::recv_callback_s(RefPtr<nr_udp_message> msg) {
}
}
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
// TCPSocket.
class NrTcpSocketIpc::TcpSocketReadyRunner: public nsRunnable
{
public:
explicit TcpSocketReadyRunner(NrTcpSocketIpc *sck)
: socket_(sck) {}
NS_IMETHODIMP Run() {
socket_->maybe_post_socket_ready();
return NS_OK;
}
private:
nsRefPtr<NrTcpSocketIpc> socket_;
};
NS_IMPL_ISUPPORTS(NrTcpSocketIpc,
nsITCPSocketCallback)
NrTcpSocketIpc::NrTcpSocketIpc(nsIThread* aThread)
: NrSocketIpc(static_cast<nsIEventTarget*>(aThread)),
mirror_state_(NR_INIT),
state_(NR_INIT),
buffered_bytes_(0),
tracking_number_(0) {
}
NrTcpSocketIpc::~NrTcpSocketIpc()
{
// also guarantees socket_child_ is released from the io_thread
// close(), but transfer the socket_child_ reference to die as well
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnableNM(&NrTcpSocketIpc::release_child_i,
socket_child_.forget().take(),
sts_thread_),
NS_DISPATCH_NORMAL);
}
//
// nsITCPSocketCallback methods
//
NS_IMETHODIMP NrTcpSocketIpc::UpdateReadyState(uint32_t aReadyState) {
NrSocketIpcState temp = NR_INIT;
switch (static_cast<dom::TCPReadyState>(aReadyState)) {
case dom::TCPReadyState::Connecting:
temp = NR_CONNECTING;
break;
case dom::TCPReadyState::Open:
temp = NR_CONNECTED;
break;
case dom::TCPReadyState::Closing:
temp = NR_CLOSING;
break;
case dom::TCPReadyState::Closed:
temp = NR_CLOSED;
break;
default:
MOZ_ASSERT(false, "Invalid ReadyState");
return NS_OK;
}
if (mirror_state_ != temp) {
mirror_state_ = temp;
RUN_ON_THREAD(sts_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::update_state_s,
temp),
NS_DISPATCH_NORMAL);
}
return NS_OK;
}
NS_IMETHODIMP NrTcpSocketIpc::UpdateBufferedAmount(uint32_t buffered_amount,
uint32_t tracking_number) {
RUN_ON_THREAD(sts_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::message_sent_s,
buffered_amount,
tracking_number),
NS_DISPATCH_NORMAL);
return NS_OK;
}
NS_IMETHODIMP NrTcpSocketIpc::FireDataArrayEvent(const nsAString& aType,
const InfallibleTArray<uint8_t>& buffer) {
// Called when we received data.
uint8_t *buf = const_cast<uint8_t*>(buffer.Elements());
nsAutoPtr<DataBuffer> data_buf(new DataBuffer(buf, buffer.Length()));
nsRefPtr<nr_tcp_message> msg = new nr_tcp_message(data_buf);
RUN_ON_THREAD(sts_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::recv_message_s,
msg),
NS_DISPATCH_NORMAL);
return NS_OK;
}
NS_IMETHODIMP NrTcpSocketIpc::FireErrorEvent(const nsAString &type,
const nsAString &name) {
r_log(LOG_GENERIC, LOG_ERR,
"Error from TCPSocketChild: type: %s, name: %s",
NS_LossyConvertUTF16toASCII(type).get(), NS_LossyConvertUTF16toASCII(name).get());
socket_child_ = nullptr;
mirror_state_ = NR_CLOSED;
RUN_ON_THREAD(sts_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::update_state_s,
NR_CLOSED),
NS_DISPATCH_NORMAL);
return NS_OK;
}
// methods of nsITCPSocketCallback that we are not going to implement.
NS_IMETHODIMP NrTcpSocketIpc::FireDataEvent(JSContext* aCx,
const nsAString &type,
const JS::HandleValue data) {
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP NrTcpSocketIpc::FireDataStringEvent(const nsAString &type,
const nsACString &data) {
return NS_ERROR_NOT_IMPLEMENTED;
}
NS_IMETHODIMP NrTcpSocketIpc::FireEvent(const nsAString &type) {
// XXX support type.mData == 'close' at least
return NS_ERROR_NOT_IMPLEMENTED;
}
//
// NrSocketBase methods.
//
int NrTcpSocketIpc::create(nr_transport_addr *addr) {
int r, _status;
nsresult rv;
int32_t port;
nsCString host;
if (state_ != NR_INIT) {
ABORT(R_INTERNAL);
}
sts_thread_ = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
if (NS_FAILED(rv)) {
MOZ_ASSERT(false, "Failed to get STS thread");
ABORT(R_INTERNAL);
}
// Sanity check
if ((r=nr_transport_addr_get_addrstring_and_port(addr, &host, &port))) {
ABORT(r);
}
if ((r=nr_transport_addr_copy(&my_addr_, addr))) {
ABORT(r);
}
_status = 0;
abort:
return(_status);
}
int NrTcpSocketIpc::sendto(const void *msg, size_t len,
int flags, nr_transport_addr *to) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrTcpSocketIpc::recvfrom(void * buf, size_t maxlen,
size_t *len, int flags,
nr_transport_addr *from) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrTcpSocketIpc::getaddr(nr_transport_addr *addrp) {
ASSERT_ON_THREAD(sts_thread_);
return nr_transport_addr_copy(addrp, &my_addr_);
}
void NrTcpSocketIpc::close() {
ASSERT_ON_THREAD(sts_thread_);
if (state_ == NR_CLOSED || state_ == NR_CLOSING) {
return;
}
state_ = NR_CLOSING;
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::close_i),
NS_DISPATCH_NORMAL);
//remove all enqueued messages
std::queue<RefPtr<nr_tcp_message>> empty;
std::swap(msg_queue_, empty);
}
int NrTcpSocketIpc::connect(nr_transport_addr *addr) {
nsCString remote_addr, local_addr;
int32_t remote_port, local_port;
int r, _status;
if ((r=nr_transport_addr_get_addrstring_and_port(addr,
&remote_addr,
&remote_port))) {
ABORT(r);
}
if ((r=nr_transport_addr_get_addrstring_and_port(&my_addr_,
&local_addr,
&local_port))) {
MOZ_ASSERT(false); // shouldn't fail as it was sanity-checked in ::create()
ABORT(r);
}
state_ = mirror_state_ = NR_CONNECTING;
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::connect_i,
remote_addr,
static_cast<uint16_t>(remote_port),
local_addr,
static_cast<uint16_t>(local_port)),
NS_DISPATCH_NORMAL);
// Make caller wait for ready to write.
_status = R_WOULDBLOCK;
abort:
return _status;
}
int NrTcpSocketIpc::write(const void *msg, size_t len, size_t *written) {
ASSERT_ON_THREAD(sts_thread_);
int _status = 0;
if (state_ != NR_CONNECTED) {
ABORT(R_FAILED);
}
if (buffered_bytes_ + len >= nsITCPSocketCallback::BUFFER_SIZE) {
ABORT(R_WOULDBLOCK);
}
buffered_bytes_ += len;
{
InfallibleTArray<uint8_t>* arr = new InfallibleTArray<uint8_t>();
arr->AppendElements(static_cast<const uint8_t*>(msg), len);
// keep track of un-acknowleged writes by tracking number.
writes_in_flight_.push_back(len);
RUN_ON_THREAD(io_thread_,
mozilla::WrapRunnable(nsRefPtr<NrTcpSocketIpc>(this),
&NrTcpSocketIpc::write_i,
nsAutoPtr<InfallibleTArray<uint8_t>>(arr),
++tracking_number_),
NS_DISPATCH_NORMAL);
}
*written = len;
abort:
return _status;
}
int NrTcpSocketIpc::read(void* buf, size_t maxlen, size_t *len) {
int _status = 0;
if (state_ != NR_CONNECTED) {
ABORT(R_FAILED);
}
if (msg_queue_.size() == 0) {
ABORT(R_WOULDBLOCK);
}
{
nsRefPtr<nr_tcp_message> msg(msg_queue_.front());
size_t consumed_len = std::min(maxlen, msg->unread_bytes());
memcpy(buf, msg->reading_pointer(), consumed_len);
if (consumed_len < msg->unread_bytes()) {
// There is still something left in buffer.
msg->read_bytes += consumed_len;
} else {
msg_queue_.pop();
}
*len = consumed_len;
}
abort:
return _status;
}
int NrTcpSocketIpc::listen(int backlog) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
int NrTcpSocketIpc::accept(nr_transport_addr *addrp, nr_socket **sockp) {
MOZ_ASSERT(false);
return R_INTERNAL;
}
void NrTcpSocketIpc::connect_i(const nsACString &remote_addr,
uint16_t remote_port,
const nsACString &local_addr,
uint16_t local_port) {
ASSERT_ON_THREAD(io_thread_);
mirror_state_ = NR_CONNECTING;
dom::TCPSocketChild* child = new dom::TCPSocketChild(NS_ConvertUTF8toUTF16(remote_addr), remote_port);
socket_child_ = child;
// XXX remove remote!
socket_child_->SendWindowlessOpenBind(this,
remote_addr, remote_port,
local_addr, local_port,
/* use ssl */ false);
}
void NrTcpSocketIpc::write_i(nsAutoPtr<InfallibleTArray<uint8_t>> arr,
uint32_t tracking_number) {
ASSERT_ON_THREAD(io_thread_);
if (!socket_child_) {
return;
}
socket_child_->SendSendArray(*arr, tracking_number);
}
void NrTcpSocketIpc::close_i() {
ASSERT_ON_THREAD(io_thread_);
mirror_state_ = NR_CLOSING;
if (!socket_child_) {
return;
}
socket_child_->SendClose();
}
// close(), but transfer the socket_child_ reference to die as well
// static
void NrTcpSocketIpc::release_child_i(dom::TCPSocketChild* aChild,
nsCOMPtr<nsIEventTarget> sts_thread) {
nsRefPtr<dom::TCPSocketChild> socket_child_ref =
already_AddRefed<dom::TCPSocketChild>(aChild);
if (socket_child_ref) {
socket_child_ref->SendClose();
}
// io_thread_ is MainThread, so no use to release
}
void NrTcpSocketIpc::message_sent_s(uint32_t buffered_amount,
uint32_t tracking_number) {
ASSERT_ON_THREAD(sts_thread_);
size_t num_unacked_writes = tracking_number_ - tracking_number;
while (writes_in_flight_.size() > num_unacked_writes) {
writes_in_flight_.pop_front();
}
for (size_t unacked_write_len : writes_in_flight_) {
buffered_amount += unacked_write_len;
}
r_log(LOG_GENERIC, LOG_ERR,
"UpdateBufferedAmount: (tracking %u): %u, waiting: %s",
tracking_number, buffered_amount,
(poll_flags() & PR_POLL_WRITE) ? "yes" : "no");
buffered_bytes_ = buffered_amount;
maybe_post_socket_ready();
}
void NrTcpSocketIpc::recv_message_s(nr_tcp_message *msg) {
ASSERT_ON_THREAD(sts_thread_);
msg_queue_.push(msg);
maybe_post_socket_ready();
}
void NrTcpSocketIpc::update_state_s(NrSocketIpcState next_state) {
ASSERT_ON_THREAD(sts_thread_);
// only allow valid transitions
switch (state_) {
case NR_CONNECTING:
if (next_state == NR_CONNECTED) {
state_ = NR_CONNECTED;
maybe_post_socket_ready();
} else {
state_ = next_state; // all states are valid from CONNECTING
}
break;
case NR_CONNECTED:
if (next_state != NR_CONNECTING) {
state_ = next_state;
}
break;
case NR_CLOSING:
if (next_state == NR_CLOSED) {
state_ = next_state;
}
break;
case NR_CLOSED:
break;
default:
MOZ_CRASH("update_state_s while in illegal state");
}
}
void NrTcpSocketIpc::maybe_post_socket_ready() {
bool has_event = false;
if (state_ == NR_CONNECTED) {
if (poll_flags() & PR_POLL_WRITE) {
// This effectively polls via the event loop until the
// NR_ASYNC_WAIT_WRITE is no longer armed.
if (buffered_bytes_ < nsITCPSocketCallback::BUFFER_SIZE) {
r_log(LOG_GENERIC, LOG_INFO, "Firing write callback (%u)",
(uint32_t)buffered_bytes_);
fire_callback(NR_ASYNC_WAIT_WRITE);
has_event = true;
}
}
if (poll_flags() & PR_POLL_READ) {
if (msg_queue_.size()) {
r_log(LOG_GENERIC, LOG_INFO, "Firing read callback (%u)",
(uint32_t)msg_queue_.size());
fire_callback(NR_ASYNC_WAIT_READ);
has_event = true;
}
}
}
// If any event has been posted, we post a runnable to see
// if the events have to be posted again.
if (has_event) {
nsRefPtr<TcpSocketReadyRunner> runnable = new TcpSocketReadyRunner(this);
NS_DispatchToCurrentThread(runnable);
}
}
#endif
} // close namespace
@ -1474,16 +1963,30 @@ static nr_socket_vtbl nr_socket_local_vtbl={
int nr_socket_local_create(void *obj, nr_transport_addr *addr, nr_socket **sockp) {
RefPtr<NrSocketBase> sock;
int r, _status;
// create IPC bridge for content process
if (XRE_IsParentProcess()) {
sock = new NrSocket();
} else {
sock = new NrSocketIpc();
switch (addr->protocol) {
case IPPROTO_UDP:
sock = new NrUdpSocketIpc();
break;
case IPPROTO_TCP:
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
{
nsCOMPtr<nsIThread> main_thread;
NS_GetMainThread(getter_AddRefs(main_thread));
sock = new NrTcpSocketIpc(main_thread.get());
}
#else
ABORT(R_REJECTED);
#endif
break;
}
}
int r, _status;
r = sock->create(addr);
if (r)
ABORT(r);

Просмотреть файл

@ -61,6 +61,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "nsProxyRelease.h"
#include "nsThreadUtils.h"
#include "nsITCPSocketCallback.h"
#include "databuffer.h"
#include "m_cpp_utils.h"
#include "mozilla/ReentrantMonitor.h"
@ -72,6 +73,14 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
typedef struct nr_socket_vtbl_ nr_socket_vtbl;
typedef struct nr_socket_ nr_socket;
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
namespace mozilla {
namespace dom {
class TCPSocketChild;
}
}
#endif
namespace mozilla {
namespace net {
@ -209,7 +218,22 @@ public:
NR_CLOSED,
};
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(NrSocketIpc, override)
NrSocketIpc(nsIEventTarget* aThread);
protected:
nsCOMPtr<nsIEventTarget> sts_thread_;
// Note: for UDP PBackground, this is a thread held by SingletonThreadHolder.
// For TCP PNecko, this is MainThread (and TCPSocket requires MainThread currently)
const nsCOMPtr<nsIEventTarget> io_thread_;
virtual ~NrSocketIpc() {};
private:
DISALLOW_COPY_ASSIGN(NrSocketIpc);
};
class NrUdpSocketIpc : public NrSocketIpc {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(NrUdpSocketIpc, override)
NS_IMETHODIMP CallListenerError(const nsACString &message,
const nsACString &filename,
@ -221,7 +245,7 @@ public:
NS_IMETHODIMP CallListenerOpened();
NS_IMETHODIMP CallListenerClosed();
NrSocketIpc();
NrUdpSocketIpc();
// Implementations of the NrSocketBase APIs
virtual int create(nr_transport_addr *addr) override;
@ -239,11 +263,9 @@ public:
virtual int accept(nr_transport_addr *addrp, nr_socket **sockp) override;
private:
virtual ~NrSocketIpc();
virtual ~NrUdpSocketIpc();
DISALLOW_COPY_ASSIGN(NrSocketIpc);
static nsIThread* GetIOThreadAndAddUse_s();
DISALLOW_COPY_ASSIGN(NrUdpSocketIpc);
// Main or private thread executors of the NrSocketBase APIs
void create_i(const nsACString &host, const uint16_t port);
@ -256,32 +278,118 @@ private:
// STS thread executor
void recv_callback_s(RefPtr<nr_udp_message> msg);
ReentrantMonitor monitor_; // protects err_and state_
bool err_;
NrSocketIpcState state_;
std::queue<RefPtr<nr_udp_message> > received_msgs_;
std::queue<RefPtr<nr_udp_message>> received_msgs_;
nsRefPtr<nsIUDPSocketChild> socket_child_; // only accessed from the io_thread
nsCOMPtr<nsIEventTarget> sts_thread_;
const nsCOMPtr<nsIEventTarget> io_thread_;
ReentrantMonitor monitor_;
};
// The socket child holds onto one of these, which just passes callbacks
// through and makes sure the ref to the NrSocketIpc is released on STS.
class NrSocketIpcProxy : public nsIUDPSocketInternal {
class NrUdpSocketIpcProxy : public nsIUDPSocketInternal {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSIUDPSOCKETINTERNAL
nsresult Init(const nsRefPtr<NrSocketIpc>& socket);
nsresult Init(const nsRefPtr<NrUdpSocketIpc>& socket);
private:
virtual ~NrSocketIpcProxy();
virtual ~NrUdpSocketIpcProxy();
nsRefPtr<NrSocketIpc> socket_;
nsRefPtr<NrUdpSocketIpc> socket_;
nsCOMPtr<nsIEventTarget> sts_thread_;
};
struct nr_tcp_message {
explicit nr_tcp_message(nsAutoPtr<DataBuffer> &data)
: read_bytes(0)
, data(data) {
}
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(nr_tcp_message);
const uint8_t *reading_pointer() const {
return data->data() + read_bytes;
}
size_t unread_bytes() const {
return data->len() - read_bytes;
}
size_t read_bytes;
private:
~nr_tcp_message() {}
DISALLOW_COPY_ASSIGN(nr_tcp_message);
nsAutoPtr<DataBuffer> data;
};
#if defined(MOZILLA_INTERNAL_API) && !defined(MOZILLA_XPCOMRT_API)
class NrTcpSocketIpc : public NrSocketIpc,
public nsITCPSocketCallback {
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_NSITCPSOCKETCALLBACK
explicit NrTcpSocketIpc(nsIThread* aThread);
// Implementations of the NrSocketBase APIs
virtual int create(nr_transport_addr *addr) override;
virtual int sendto(const void *msg, size_t len,
int flags, nr_transport_addr *to) override;
virtual int recvfrom(void * buf, size_t maxlen,
size_t *len, int flags,
nr_transport_addr *from) override;
virtual int getaddr(nr_transport_addr *addrp) override;
virtual void close() override;
virtual int connect(nr_transport_addr *addr) override;
virtual int write(const void *msg, size_t len, size_t *written) override;
virtual int read(void* buf, size_t maxlen, size_t *len) override;
virtual int listen(int backlog) override;
virtual int accept(nr_transport_addr *addrp, nr_socket **sockp) override;
private:
class TcpSocketReadyRunner;
DISALLOW_COPY_ASSIGN(NrTcpSocketIpc);
virtual ~NrTcpSocketIpc();
// Main thread executors of the NrSocketBase APIs
void connect_i(const nsACString &remote_addr,
uint16_t remote_port,
const nsACString &local_addr,
uint16_t local_port);
void write_i(nsAutoPtr<InfallibleTArray<uint8_t>> buf,
uint32_t tracking_number);
void close_i();
static void release_child_i(dom::TCPSocketChild* aChild, nsCOMPtr<nsIEventTarget> ststhread);
// STS thread executor
void message_sent_s(uint32_t bufferedAmount, uint32_t tracking_number);
void recv_message_s(nr_tcp_message *msg);
void update_state_s(NrSocketIpcState next_state);
void maybe_post_socket_ready();
// Accessed from UpdateReadyState (not sts_thread) to avoid sending
// runnables when not needed
NrSocketIpcState mirror_state_;
// variables that can only be accessed on STS.
NrSocketIpcState state_;
std::queue<RefPtr<nr_tcp_message>> msg_queue_;
uint32_t buffered_bytes_;
uint32_t tracking_number_;
std::deque<size_t> writes_in_flight_;
// main thread.
nsRefPtr<dom::TCPSocketChild> socket_child_;
};
#endif
int nr_netaddr_to_transport_addr(const net::NetAddr *netaddr,
nr_transport_addr *addr,
int protocol);

Просмотреть файл

@ -7,6 +7,7 @@
Library('mtransport_standalone')
include('../common.build')
include("/ipc/chromium/chromium-config.mozbuild")
# These files cannot be built in unified mode because of the redefinition of
# getLogModule, UNIMPLEMENTED, nr_socket_long_term_violation_time,

Просмотреть файл

@ -5,6 +5,7 @@
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
include('../common.build')
include("/ipc/chromium/chromium-config.mozbuild")
# These files cannot be built in unified mode because of the redefinition of
# getLogModule, UNIMPLEMENTED, nr_socket_long_term_violation_time,

Просмотреть файл

@ -38,6 +38,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <sys/types.h>
#include <sys/queue.h>
#include <assert.h>
#include <inttypes.h>
#include "p_buf.h"
#include "nr_socket.h"
@ -374,8 +375,10 @@ static void nr_socket_buffered_stun_connected_cb(NR_SOCKET s, int how, void *arg
assert(!sock->connected);
sock->connected = 1;
if (sock->pending)
if (sock->pending) {
r_log(LOG_GENERIC, LOG_INFO, "Invoking writable_cb on connected (%u)", (uint32_t) sock->pending);
nr_socket_buffered_stun_writable_cb(s, how, arg);
}
}
static int nr_socket_buffered_stun_connect(void *obj, nr_transport_addr *addr)
@ -398,6 +401,7 @@ static int nr_socket_buffered_stun_connect(void *obj, nr_transport_addr *addr)
}
ABORT(r);
} else {
r_log(LOG_GENERIC, LOG_INFO, "Connected without blocking");
sock->connected = 1;
}
@ -432,7 +436,9 @@ static int nr_socket_buffered_stun_write(void *obj,const void *msg, size_t len,
/* Buffers are close to full, report error. Do this now so we never
get partial writes */
if ((sock->pending + len) > sock->max_pending) {
r_log(LOG_GENERIC, LOG_INFO, "Write buffer for %s full", sock->remote_addr.as_string);
r_log(LOG_GENERIC, LOG_INFO, "Write buffer for %s full (%u + %u > %u) - re-arming @%p",
sock->remote_addr.as_string, (uint32_t)sock->pending, (uint32_t)len, (uint32_t)sock->max_pending,
&(sock->pending));
ABORT(R_WOULDBLOCK);
}
@ -440,8 +446,13 @@ static int nr_socket_buffered_stun_write(void *obj,const void *msg, size_t len,
if (sock->connected && !sock->pending) {
r = nr_socket_write(sock->inner, msg, len, &written2, 0);
if (r) {
if (r != R_WOULDBLOCK)
if (r != R_WOULDBLOCK) {
r_log(LOG_GENERIC, LOG_ERR, "Write error for %s - %d",
sock->remote_addr.as_string, r);
ABORT(r);
}
r_log(LOG_GENERIC, LOG_INFO, "Write of %" PRIu64 " blocked for %s",
(uint64_t) len, sock->remote_addr.as_string);
written2=0;
}
@ -454,8 +465,12 @@ static int nr_socket_buffered_stun_write(void *obj,const void *msg, size_t len,
if (len) {
if ((r=nr_p_buf_write_to_chain(sock->p_bufs, &sock->pending_writes,
((UCHAR *)msg) + written2, len)))
((UCHAR *)msg) + written2, len))) {
r_log(LOG_GENERIC, LOG_ERR, "Write_to_chain error for %s - %d",
sock->remote_addr.as_string, r);
ABORT(r);
}
sock->pending += len;
}
@ -464,6 +479,9 @@ static int nr_socket_buffered_stun_write(void *obj,const void *msg, size_t len,
if ((r=nr_socket_buffered_stun_arm_writable_cb(sock)))
ABORT(r);
}
r_log(LOG_GENERIC, LOG_INFO, "Write buffer not empty for %s %u - %s armed (@%p)",
sock->remote_addr.as_string, (uint32_t)sock->pending,
already_armed ? "already" : "", &sock->pending);
*written = original_len;
@ -486,6 +504,8 @@ static void nr_socket_buffered_stun_writable_cb(NR_SOCKET s, int how, void *arg)
n1->length - n1->r_offset,
&written, 0))) {
r_log(LOG_GENERIC, LOG_ERR, "Write error for %s - %d",
sock->remote_addr.as_string, r);
ABORT(r);
}
@ -495,6 +515,9 @@ static void nr_socket_buffered_stun_writable_cb(NR_SOCKET s, int how, void *arg)
if (n1->r_offset < n1->length) {
/* We wrote something, but not everything */
r_log(LOG_GENERIC, LOG_INFO, "Write in callback didn't write all (remaining %u of %u) for %s",
n1->length - n1->r_offset, n1->length,
sock->remote_addr.as_string);
ABORT(R_WOULDBLOCK);
}
@ -506,7 +529,10 @@ static void nr_socket_buffered_stun_writable_cb(NR_SOCKET s, int how, void *arg)
assert(!sock->pending);
_status=0;
abort:
r_log(LOG_GENERIC, LOG_INFO, "Writable_cb %s (%u (%p) pending)",
sock->remote_addr.as_string, (uint32_t)sock->pending, &(sock->pending));
if (_status && _status != R_WOULDBLOCK) {
r_log(LOG_GENERIC, LOG_ERR, "Failure in writable_cb: %d", _status);
nr_socket_buffered_stun_failed(sock);
} else if (sock->pending) {
nr_socket_buffered_stun_arm_writable_cb(sock);

Просмотреть файл

@ -622,12 +622,6 @@ PeerConnectionConfiguration::AddIceServer(const RTCIceServer &aServer)
NS_ConvertUTF16toUTF8 credential(aServer.mCredential);
NS_ConvertUTF16toUTF8 username(aServer.mUsername);
// Bug 1039655 - TURN TCP is not e10s ready
if ((transport == kNrIceTransportTcp) &&
(!XRE_IsParentProcess())) {
continue;
}
if (!addTurnServer(host.get(), port,
username.get(),
credential.get(),