Bug 1170466: Share socket I/O methods in |ConnectionOrientedSocketIO|, r=kmachulis

With this patch, |ConnectionOrientedSocketIO| implements methods
for sending and receiving data. All sub-classes have been changed
accordingly.
This commit is contained in:
Thomas Zimmermann 2015-06-03 11:53:50 +02:00
Родитель bc636a00f4
Коммит 7eb46d9e39
4 изменённых файлов: 96 добавлений и 160 удалений

Просмотреть файл

@ -209,17 +209,6 @@ public:
BluetoothDaemonConnection* aConnection,
BluetoothDaemonPDUConsumer* aConsumer);
// Task callback methods
//
void Send(UnixSocketIOBuffer* aBuffer);
void OnSocketCanReceiveWithoutBlocking() override;
void OnSocketCanSendWithoutBlocking() override;
void OnConnected() override;
void OnError(const char* aFunction, int aErrno) override;
// Methods for |ConnectionOrientedSocketIO|
//
@ -271,72 +260,6 @@ BluetoothDaemonConnectionIO::BluetoothDaemonConnectionIO(
MOZ_ASSERT(mConsumer);
}
void
BluetoothDaemonConnectionIO::Send(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(aBuffer);
EnqueueData(aBuffer);
AddWatchers(WRITE_WATCHER, false);
}
void
BluetoothDaemonConnectionIO::OnSocketCanReceiveWithoutBlocking()
{
ssize_t res = ReceiveData(GetFd());
if (res < 0) {
/* I/O error */
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
} else if (!res) {
/* EOF or peer shutdown */
RemoveWatchers(READ_WATCHER);
}
}
void
BluetoothDaemonConnectionIO::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
MOZ_ASSERT(!IsShutdownOnIOThread());
if (NS_WARN_IF(NS_FAILED(SendPendingData(GetFd())))) {
RemoveWatchers(WRITE_WATCHER);
}
}
void
BluetoothDaemonConnectionIO::OnConnected()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
BluetoothDaemonConnectionIO::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
// Clean up watchers, status, fd
Close();
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
// |ConnectionOrientedSocketIO|
nsresult

Просмотреть файл

@ -32,6 +32,90 @@ ConnectionOrientedSocketIO::ConnectionOrientedSocketIO(
ConnectionOrientedSocketIO::~ConnectionOrientedSocketIO()
{ }
void
ConnectionOrientedSocketIO::Send(UnixSocketIOBuffer* aBuffer)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
EnqueueData(aBuffer);
AddWatchers(WRITE_WATCHER, false);
}
// |UnixSocketWatcher|
void
ConnectionOrientedSocketIO::OnSocketCanReceiveWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
ssize_t res = ReceiveData(GetFd());
if (res < 0) {
/* I/O error */
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
} else if (!res) {
/* EOF or peer shutdown */
RemoveWatchers(READ_WATCHER);
}
}
void
ConnectionOrientedSocketIO::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
MOZ_ASSERT(!IsShutdownOnIOThread());
nsresult rv = SendPendingData(GetFd());
if (NS_FAILED(rv)) {
return;
}
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
ConnectionOrientedSocketIO::OnConnected()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
ConnectionOrientedSocketIO::OnListening()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
NS_NOTREACHED("Invalid call to |ConnectionOrientedSocketIO::OnListening|");
}
void
ConnectionOrientedSocketIO::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
// Clean up watchers, status, fd
Close();
// Tell the consumer thread we've errored
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_ERROR),
NS_DISPATCH_NORMAL);
}
//
// ConnectionOrientedSocket
//

Просмотреть файл

@ -35,6 +35,18 @@ public:
const struct sockaddr* aAddress,
socklen_t aAddressLength) = 0;
void Send(UnixSocketIOBuffer* aBuffer);
// Methods for |UnixSocketWatcher|
//
void OnSocketCanReceiveWithoutBlocking() final;
void OnSocketCanSendWithoutBlocking() final;
void OnListening() final;
void OnConnected() final;
void OnError(const char* aFunction, int aErrno) final;
protected:
/**
* Constructs an instance of |ConnectionOrientedSocketIO|

Просмотреть файл

@ -56,17 +56,6 @@ public:
*/
void Connect();
void Send(UnixSocketIOBuffer* aBuffer);
// I/O callback methods
//
void OnConnected() override;
void OnError(const char* aFunction, int aErrno) override;
void OnListening() override;
void OnSocketCanReceiveWithoutBlocking() override;
void OnSocketCanSendWithoutBlocking() override;
// Methods for |ConnectionOrientedSocketIO|
//
@ -239,78 +228,6 @@ StreamSocketIO::Connect()
NS_WARN_IF(NS_FAILED(rv));
}
void
StreamSocketIO::Send(UnixSocketIOBuffer* aData)
{
EnqueueData(aData);
AddWatchers(WRITE_WATCHER, false);
}
void
StreamSocketIO::OnConnected()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED);
GetConsumerThread()->Dispatch(
new SocketIOEventRunnable(this, SocketIOEventRunnable::CONNECT_SUCCESS),
NS_DISPATCH_NORMAL);
AddWatchers(READ_WATCHER, true);
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
StreamSocketIO::OnListening()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
NS_NOTREACHED("Invalid call to |StreamSocketIO::OnListening|");
}
void
StreamSocketIO::OnError(const char* aFunction, int aErrno)
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
UnixFdWatcher::OnError(aFunction, aErrno);
FireSocketError();
}
void
StreamSocketIO::OnSocketCanReceiveWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
ssize_t res = ReceiveData(GetFd());
if (res < 0) {
/* I/O error */
RemoveWatchers(READ_WATCHER|WRITE_WATCHER);
} else if (!res) {
/* EOF or peer shutdown */
RemoveWatchers(READ_WATCHER);
}
}
void
StreamSocketIO::OnSocketCanSendWithoutBlocking()
{
MOZ_ASSERT(MessageLoopForIO::current() == GetIOLoop());
MOZ_ASSERT(GetConnectionStatus() == SOCKET_IS_CONNECTED); // see bug 990984
nsresult rv = SendPendingData(GetFd());
if (NS_FAILED(rv)) {
return;
}
if (HasPendingData()) {
AddWatchers(WRITE_WATCHER, false);
}
}
void
StreamSocketIO::FireSocketError()
{