diff --git a/ipc/unixsocket/UnixSocket.cpp b/ipc/unixsocket/UnixSocket.cpp index 4d7b262723eb..31ac1b32ff6b 100644 --- a/ipc/unixsocket/UnixSocket.cpp +++ b/ipc/unixsocket/UnixSocket.cpp @@ -16,113 +16,55 @@ namespace mozilla { namespace ipc { // -// UnixSocketImpl +// UnixSocketConsumerIO // -class UnixSocketImpl : public UnixSocketWatcher - , protected SocketIOBase +class UnixSocketConsumerIO MOZ_FINAL : public UnixSocketWatcher + , protected SocketIOBase { public: - UnixSocketImpl(MessageLoop* mIOLoop, - UnixSocketConsumer* aConsumer, UnixSocketConnector* aConnector, - const nsACString& aAddress) - : UnixSocketWatcher(mIOLoop) - , SocketIOBase(MAX_READ_SIZE) - , mConsumer(aConsumer) - , mConnector(aConnector) - , mShuttingDownOnIOThread(false) - , mAddress(aAddress) - , mDelayedConnectTask(nullptr) - { - } + UnixSocketConsumerIO(MessageLoop* mIOLoop, + UnixSocketConsumer* aConsumer, + UnixSocketConnector* aConnector, + const nsACString& aAddress); + ~UnixSocketConsumerIO(); - ~UnixSocketImpl() - { - MOZ_ASSERT(NS_IsMainThread()); - MOZ_ASSERT(IsShutdownOnMainThread()); - } + void GetSocketAddr(nsAString& aAddrStr) const; + SocketConsumerBase* GetConsumer(); - void Send(UnixSocketRawData* aData) - { - EnqueueData(aData); - AddWatchers(WRITE_WATCHER, false); - } + // Shutdown state + // - bool IsShutdownOnMainThread() - { - MOZ_ASSERT(NS_IsMainThread()); - return mConsumer == nullptr; - } + bool IsShutdownOnMainThread() const; + void ShutdownOnMainThread(); - void ShutdownOnMainThread() - { - MOZ_ASSERT(NS_IsMainThread()); - MOZ_ASSERT(!IsShutdownOnMainThread()); - mConsumer = nullptr; - } + bool IsShutdownOnIOThread() const; + void ShutdownOnIOThread(); - bool IsShutdownOnIOThread() - { - return mShuttingDownOnIOThread; - } + // Delayed-task handling + // - void ShutdownOnIOThread() - { - MOZ_ASSERT(!NS_IsMainThread()); - MOZ_ASSERT(!mShuttingDownOnIOThread); + void SetDelayedConnectTask(CancelableTask* aTask); + void ClearDelayedConnectTask(); + void CancelDelayedConnectTask(); - Close(); // will also remove fd from I/O loop - mShuttingDownOnIOThread = true; - } - - void SetDelayedConnectTask(CancelableTask* aTask) - { - MOZ_ASSERT(NS_IsMainThread()); - mDelayedConnectTask = aTask; - } - - void ClearDelayedConnectTask() - { - MOZ_ASSERT(NS_IsMainThread()); - mDelayedConnectTask = nullptr; - } - - void CancelDelayedConnectTask() - { - MOZ_ASSERT(NS_IsMainThread()); - if (!mDelayedConnectTask) { - return; - } - mDelayedConnectTask->Cancel(); - ClearDelayedConnectTask(); - } - - /** - * Connect to a socket - */ - void Connect(); + // Task callback methods + // /** * Run bind/listen to prepare for further runs of accept() */ void Listen(); - void GetSocketAddr(nsAString& aAddrStr) - { - if (!mConnector) { - NS_WARNING("No connector to get socket address from!"); - aAddrStr.Truncate(); - return; - } - mConnector->GetSocketAddr(mAddr, aAddrStr); - } - /** - * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated - * directly from main thread. All non-main-thread accesses should happen with - * mImpl as container. + * Connect to a socket */ - RefPtr mConsumer; + void Connect(); + + void Send(UnixSocketRawData* aData); + + // I/O callback methods + // void OnAccepted(int aFd, const sockaddr_any* aAddr, socklen_t aAddrLen) MOZ_OVERRIDE; @@ -132,16 +74,18 @@ public: void OnSocketCanReceiveWithoutBlocking() MOZ_OVERRIDE; void OnSocketCanSendWithoutBlocking() MOZ_OVERRIDE; - SocketConsumerBase* GetConsumer() - { - return mConsumer.get(); - } - private: - // Set up flags on whatever our current file descriptor is. + void FireSocketError(); + + // Set up flags on file descriptor. static bool SetSocketFlags(int aFd); - void FireSocketError(); + /** + * Consumer pointer. Non-thread safe RefPtr, so should only be manipulated + * directly from main thread. All non-main-thread accesses should happen with + * mIO as container. + */ + RefPtr mConsumer; /** * Connector object used to create the connection we are currently using. @@ -174,77 +118,109 @@ private: CancelableTask* mDelayedConnectTask; }; -class SocketListenTask : public SocketIOTask +UnixSocketConsumerIO::UnixSocketConsumerIO(MessageLoop* mIOLoop, + UnixSocketConsumer* aConsumer, + UnixSocketConnector* aConnector, + const nsACString& aAddress) + : UnixSocketWatcher(mIOLoop) + , SocketIOBase(MAX_READ_SIZE) + , mConsumer(aConsumer) + , mConnector(aConnector) + , mShuttingDownOnIOThread(false) + , mAddress(aAddress) + , mDelayedConnectTask(nullptr) { -public: - SocketListenTask(UnixSocketImpl* aImpl) - : SocketIOTask(aImpl) - { } + MOZ_ASSERT(mConsumer); + MOZ_ASSERT(mConnector); +} - void Run() MOZ_OVERRIDE - { - MOZ_ASSERT(!NS_IsMainThread()); - if (!IsCanceled()) { - GetIO()->Listen(); - } - } -}; - -class SocketConnectTask : public SocketIOTask +UnixSocketConsumerIO::~UnixSocketConsumerIO() { -public: - SocketConnectTask(UnixSocketImpl* aImpl) - : SocketIOTask(aImpl) - { } - - void Run() MOZ_OVERRIDE - { - MOZ_ASSERT(!NS_IsMainThread()); - MOZ_ASSERT(!IsCanceled()); - GetIO()->Connect(); - } -}; - -class SocketDelayedConnectTask : public SocketIOTask -{ -public: - SocketDelayedConnectTask(UnixSocketImpl* aImpl) - : SocketIOTask(aImpl) - { } - - void Run() MOZ_OVERRIDE - { - MOZ_ASSERT(NS_IsMainThread()); - if (IsCanceled()) { - return; - } - UnixSocketImpl* impl = GetIO(); - if (impl->IsShutdownOnMainThread()) { - return; - } - impl->ClearDelayedConnectTask(); - XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new SocketConnectTask(impl)); - } -}; - -void -UnixSocketImpl::FireSocketError() -{ - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - - // Clean up watchers, statuses, fds - Close(); - - // Tell the main thread we've errored - nsRefPtr r = - new SocketIOEventRunnable( - this, SocketIOEventRunnable::CONNECT_ERROR); - - NS_DispatchToMainThread(r); + MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(IsShutdownOnMainThread()); } void -UnixSocketImpl::Listen() +UnixSocketConsumerIO::GetSocketAddr(nsAString& aAddrStr) const +{ + if (!mConnector) { + NS_WARNING("No connector to get socket address from!"); + aAddrStr.Truncate(); + return; + } + mConnector->GetSocketAddr(mAddr, aAddrStr); +} + +SocketConsumerBase* +UnixSocketConsumerIO::GetConsumer() +{ + return mConsumer.get(); +} + +bool +UnixSocketConsumerIO::IsShutdownOnMainThread() const +{ + MOZ_ASSERT(NS_IsMainThread()); + + return mConsumer == nullptr; +} + +void +UnixSocketConsumerIO::ShutdownOnMainThread() +{ + MOZ_ASSERT(NS_IsMainThread()); + MOZ_ASSERT(!IsShutdownOnMainThread()); + + mConsumer = nullptr; +} + +bool +UnixSocketConsumerIO::IsShutdownOnIOThread() const +{ + return mShuttingDownOnIOThread; +} + +void +UnixSocketConsumerIO::ShutdownOnIOThread() +{ + MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!mShuttingDownOnIOThread); + + Close(); // will also remove fd from I/O loop + mShuttingDownOnIOThread = true; +} + +void +UnixSocketConsumerIO::SetDelayedConnectTask(CancelableTask* aTask) +{ + MOZ_ASSERT(NS_IsMainThread()); + + mDelayedConnectTask = aTask; +} + +void +UnixSocketConsumerIO::ClearDelayedConnectTask() +{ + MOZ_ASSERT(NS_IsMainThread()); + + mDelayedConnectTask = nullptr; +} + +void +UnixSocketConsumerIO::CancelDelayedConnectTask() +{ + MOZ_ASSERT(NS_IsMainThread()); + + if (!mDelayedConnectTask) { + return; + } + + mDelayedConnectTask->Cancel(); + ClearDelayedConnectTask(); +} + +void +UnixSocketConsumerIO::Listen() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(mConnector); @@ -279,7 +255,7 @@ UnixSocketImpl::Listen() } void -UnixSocketImpl::Connect() +UnixSocketConsumerIO::Connect() { MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); MOZ_ASSERT(mConnector); @@ -311,8 +287,149 @@ UnixSocketImpl::Connect() NS_WARN_IF(NS_FAILED(rv)); } +void +UnixSocketConsumerIO::Send(UnixSocketRawData* aData) +{ + EnqueueData(aData); + AddWatchers(WRITE_WATCHER, false); +} + +void +UnixSocketConsumerIO::OnAccepted(int aFd, + const sockaddr_any* aAddr, + socklen_t aAddrLen) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); + MOZ_ASSERT(aAddr); + MOZ_ASSERT(aAddrLen <= sizeof(mAddr)); + + memcpy (&mAddr, aAddr, aAddrLen); + mAddrSize = aAddrLen; + + if (!mConnector->SetUp(aFd)) { + NS_WARNING("Could not set up socket!"); + return; + } + + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + Close(); + if (!SetSocketFlags(aFd)) { + return; + } + SetSocket(aFd, SOCKET_IS_CONNECTED); + + nsRefPtr r = + new SocketIOEventRunnable( + this, SocketIOEventRunnable::CONNECT_SUCCESS); + NS_DispatchToMainThread(r); + + AddWatchers(READ_WATCHER, true); + if (HasPendingData()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +UnixSocketConsumerIO::OnConnected() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); + + if (!SetSocketFlags(GetFd())) { + NS_WARNING("Cannot set socket flags!"); + FireSocketError(); + return; + } + + if (!mConnector->SetUp(GetFd())) { + NS_WARNING("Could not set up socket!"); + FireSocketError(); + return; + } + + nsRefPtr r = + new SocketIOEventRunnable( + this, SocketIOEventRunnable::CONNECT_SUCCESS); + NS_DispatchToMainThread(r); + + AddWatchers(READ_WATCHER, true); + if (HasPendingData()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +UnixSocketConsumerIO::OnListening() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); + + if (!mConnector->SetUpListenSocket(GetFd())) { + NS_WARNING("Could not set up listen socket!"); + FireSocketError(); + return; + } + + AddWatchers(READ_WATCHER, true); +} + +void +UnixSocketConsumerIO::OnError(const char* aFunction, int aErrno) +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + UnixFdWatcher::OnError(aFunction, aErrno); + FireSocketError(); +} + +void +UnixSocketConsumerIO::OnSocketCanReceiveWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 + + nsresult rv = ReceiveData(GetFd(), this); + if (NS_FAILED(rv)) { + RemoveWatchers(READ_WATCHER|WRITE_WATCHER); + return; + } +} + +void +UnixSocketConsumerIO::OnSocketCanSendWithoutBlocking() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 + + nsresult rv = SendPendingData(GetFd(), this); + if (NS_FAILED(rv)) { + return; + } + + if (HasPendingData()) { + AddWatchers(WRITE_WATCHER, false); + } +} + +void +UnixSocketConsumerIO::FireSocketError() +{ + MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); + + // Clean up watchers, statuses, fds + Close(); + + // Tell the main thread we've errored + nsRefPtr r = + new SocketIOEventRunnable( + this, SocketIOEventRunnable::CONNECT_ERROR); + + NS_DispatchToMainThread(r); +} + bool -UnixSocketImpl::SetSocketFlags(int aFd) +UnixSocketConsumerIO::SetSocketFlags(int aFd) { // Set socket addr to be reused even if kernel is still waiting to close int n = 1; @@ -343,148 +460,92 @@ UnixSocketImpl::SetSocketFlags(int aFd) return true; } -void -UnixSocketImpl::OnAccepted(int aFd, - const sockaddr_any* aAddr, - socklen_t aAddrLen) +// +// Socket tasks +// + +class ListenTask MOZ_FINAL : public SocketIOTask { - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); - MOZ_ASSERT(aAddr); - MOZ_ASSERT(aAddrLen <= sizeof(mAddr)); +public: + ListenTask(UnixSocketConsumerIO* aIO) + : SocketIOTask(aIO) + { } - memcpy (&mAddr, aAddr, aAddrLen); - mAddrSize = aAddrLen; + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(!NS_IsMainThread()); - if (!mConnector->SetUp(aFd)) { - NS_WARNING("Could not set up socket!"); - return; + if (!IsCanceled()) { + GetIO()->Listen(); + } } +}; - RemoveWatchers(READ_WATCHER|WRITE_WATCHER); - Close(); - if (!SetSocketFlags(aFd)) { - return; - } - SetSocket(aFd, SOCKET_IS_CONNECTED); - - nsRefPtr r = - new SocketIOEventRunnable( - this, SocketIOEventRunnable::CONNECT_SUCCESS); - NS_DispatchToMainThread(r); - - AddWatchers(READ_WATCHER, true); - if (HasPendingData()) { - AddWatchers(WRITE_WATCHER, false); - } -} - -void -UnixSocketImpl::OnConnected() +class ConnectTask MOZ_FINAL : public SocketIOTask { - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); +public: + ConnectTask(UnixSocketConsumerIO* aIO) + : SocketIOTask(aIO) + { } - if (!SetSocketFlags(GetFd())) { - NS_WARNING("Cannot set socket flags!"); - FireSocketError(); - return; + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(!NS_IsMainThread()); + MOZ_ASSERT(!IsCanceled()); + + GetIO()->Connect(); } +}; - if (!mConnector->SetUp(GetFd())) { - NS_WARNING("Could not set up socket!"); - FireSocketError(); - return; - } - - nsRefPtr r = - new SocketIOEventRunnable( - this, SocketIOEventRunnable::CONNECT_SUCCESS); - NS_DispatchToMainThread(r); - - AddWatchers(READ_WATCHER, true); - if (HasPendingData()) { - AddWatchers(WRITE_WATCHER, false); - } -} - -void -UnixSocketImpl::OnListening() +class DelayedConnectTask MOZ_FINAL : public SocketIOTask { - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_LISTENING); +public: + DelayedConnectTask(UnixSocketConsumerIO* aIO) + : SocketIOTask(aIO) + { } - if (!mConnector->SetUpListenSocket(GetFd())) { - NS_WARNING("Could not set up listen socket!"); - FireSocketError(); - return; + void Run() MOZ_OVERRIDE + { + MOZ_ASSERT(NS_IsMainThread()); + + if (IsCanceled()) { + return; + } + + UnixSocketConsumerIO* io = GetIO(); + if (io->IsShutdownOnMainThread()) { + return; + } + + io->ClearDelayedConnectTask(); + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ConnectTask(io)); } - - AddWatchers(READ_WATCHER, true); -} - -void -UnixSocketImpl::OnError(const char* aFunction, int aErrno) -{ - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - - UnixFdWatcher::OnError(aFunction, aErrno); - FireSocketError(); -} - -void -UnixSocketImpl::OnSocketCanReceiveWithoutBlocking() -{ - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 - - nsresult rv = ReceiveData(GetFd(), this); - if (NS_FAILED(rv)) { - RemoveWatchers(READ_WATCHER|WRITE_WATCHER); - return; - } -} - -void -UnixSocketImpl::OnSocketCanSendWithoutBlocking() -{ - MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop()); - MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984 - - nsresult rv = SendPendingData(GetFd(), this); - if (NS_FAILED(rv)) { - return; - } - - if (HasPendingData()) { - AddWatchers(WRITE_WATCHER, false); - } -} +}; // // UnixSocketConsumer // UnixSocketConsumer::UnixSocketConsumer() -: mImpl(nullptr) +: mIO(nullptr) { } UnixSocketConsumer::~UnixSocketConsumer() { - MOZ_ASSERT(!mImpl); + MOZ_ASSERT(!mIO); } bool UnixSocketConsumer::SendSocketData(UnixSocketRawData* aData) { MOZ_ASSERT(NS_IsMainThread()); - if (!mImpl) { + if (!mIO) { return false; } - MOZ_ASSERT(!mImpl->IsShutdownOnMainThread()); + MOZ_ASSERT(!mIO->IsShutdownOnMainThread()); XRE_GetIOMessageLoop()->PostTask( - FROM_HERE, new SocketIOSendTask(mImpl, aData)); + FROM_HERE, new SocketIOSendTask(mIO, aData)); return true; } @@ -512,21 +573,21 @@ void UnixSocketConsumer::CloseSocket() { MOZ_ASSERT(NS_IsMainThread()); - if (!mImpl) { + if (!mIO) { return; } - mImpl->CancelDelayedConnectTask(); + mIO->CancelDelayedConnectTask(); - // From this point on, we consider mImpl as being deleted. + // From this point on, we consider mIO as being deleted. // We sever the relationship here so any future calls to listen or connect // will create a new implementation. - mImpl->ShutdownOnMainThread(); + mIO->ShutdownOnMainThread(); XRE_GetIOMessageLoop()->PostTask( - FROM_HERE, new SocketIOShutdownTask(mImpl)); + FROM_HERE, new SocketIOShutdownTask(mIO)); - mImpl = nullptr; + mIO = nullptr; NotifyDisconnect(); } @@ -535,11 +596,11 @@ void UnixSocketConsumer::GetSocketAddr(nsAString& aAddrStr) { aAddrStr.Truncate(); - if (!mImpl || GetConnectionStatus() != SOCKET_CONNECTED) { + if (!mIO || GetConnectionStatus() != SOCKET_CONNECTED) { NS_WARNING("No socket currently open!"); return; } - mImpl->GetSocketAddr(aAddrStr); + mIO->GetSocketAddr(aAddrStr); } bool @@ -552,21 +613,21 @@ UnixSocketConsumer::ConnectSocket(UnixSocketConnector* aConnector, nsAutoPtr connector(aConnector); - if (mImpl) { + if (mIO) { NS_WARNING("Socket already connecting/connected!"); return false; } nsCString addr(aAddress); MessageLoop* ioLoop = XRE_GetIOMessageLoop(); - mImpl = new UnixSocketImpl(ioLoop, this, connector.forget(), addr); + mIO = new UnixSocketConsumerIO(ioLoop, this, connector.forget(), addr); SetConnectionStatus(SOCKET_CONNECTING); if (aDelayMs > 0) { - SocketDelayedConnectTask* connectTask = new SocketDelayedConnectTask(mImpl); - mImpl->SetDelayedConnectTask(connectTask); + DelayedConnectTask* connectTask = new DelayedConnectTask(mIO); + mIO->SetDelayedConnectTask(connectTask); MessageLoop::current()->PostDelayedTask(FROM_HERE, connectTask, aDelayMs); } else { - ioLoop->PostTask(FROM_HERE, new SocketConnectTask(mImpl)); + ioLoop->PostTask(FROM_HERE, new ConnectTask(mIO)); } return true; } @@ -579,16 +640,15 @@ UnixSocketConsumer::ListenSocket(UnixSocketConnector* aConnector) nsAutoPtr connector(aConnector); - if (mImpl) { + if (mIO) { NS_WARNING("Socket already connecting/connected!"); return false; } - mImpl = new UnixSocketImpl(XRE_GetIOMessageLoop(), this, connector.forget(), - EmptyCString()); + mIO = new UnixSocketConsumerIO( + XRE_GetIOMessageLoop(), this, connector.forget(), EmptyCString()); SetConnectionStatus(SOCKET_LISTENING); - XRE_GetIOMessageLoop()->PostTask(FROM_HERE, - new SocketListenTask(mImpl)); + XRE_GetIOMessageLoop()->PostTask(FROM_HERE, new ListenTask(mIO)); return true; } diff --git a/ipc/unixsocket/UnixSocket.h b/ipc/unixsocket/UnixSocket.h index b353f50bbb83..ee7e3692a2a6 100644 --- a/ipc/unixsocket/UnixSocket.h +++ b/ipc/unixsocket/UnixSocket.h @@ -18,7 +18,7 @@ namespace mozilla { namespace ipc { -class UnixSocketImpl; +class UnixSocketConsumerIO; /** * UnixSocketConnector defines the socket creation and connection/listening @@ -160,7 +160,7 @@ public: void GetSocketAddr(nsAString& aAddrStr); private: - UnixSocketImpl* mImpl; + UnixSocketConsumerIO* mIO; }; } // namespace ipc