Bug 854846 - Make UnixSocket's connect function non-block. r=tzimmermann, a=leo+

This commit is contained in:
Kyle Machulis 2013-05-14 11:26:18 -04:00
Родитель 733a1cd2f3
Коммит d5aa94f2bc
1 изменённых файлов: 103 добавлений и 34 удалений

Просмотреть файл

@ -553,9 +553,49 @@ UnixSocketImpl::Connect()
return; return;
} }
// Select non-blocking IO.
if (!SetNonblockFlags()) {
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
NS_DispatchToMainThread(t);
return;
}
ret = connect(mFd.get(), (struct sockaddr*)&mAddr, mAddrSize); ret = connect(mFd.get(), (struct sockaddr*)&mAddr, mAddrSize);
if (ret) { if (ret) {
if (errno == EINPROGRESS) {
// Select blocking IO again, since we've now at least queue'd the connect
// as nonblock.
int current_opts = fcntl(mFd.get(), F_GETFL, 0);
if (-1 == current_opts) {
NS_WARNING("Cannot get socket opts!");
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
NS_DispatchToMainThread(t);
return;
}
if (-1 == fcntl(mFd.get(), F_SETFL, current_opts & ~O_NONBLOCK)) {
NS_WARNING("Cannot set socket opts to blocking!");
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
NS_DispatchToMainThread(t);
return;
}
// Set up a write watch to make sure we receive the connect signal
MessageLoopForIO::current()->WatchFileDescriptor(
mFd.get(),
false,
MessageLoopForIO::WATCH_WRITE,
&mWriteWatcher,
this);
#ifdef DEBUG
LOG("UnixSocket Connection delayed!");
#endif
return;
}
#if DEBUG #if DEBUG
LOG("Socket connect errno=%d\n", errno); LOG("Socket connect errno=%d\n", errno);
#endif #endif
@ -716,47 +756,76 @@ UnixSocketImpl::OnFileCanWriteWithoutBlocking(int aFd)
{ {
MOZ_ASSERT(!NS_IsMainThread()); MOZ_ASSERT(!NS_IsMainThread());
MOZ_ASSERT(!mShuttingDownOnIOThread); MOZ_ASSERT(!mShuttingDownOnIOThread);
SocketConnectionStatus status = mConsumer->GetConnectionStatus();
if (status == SOCKET_CONNECTED) {
// Try to write the bytes of mCurrentRilRawData. If all were written, continue. // Try to write the bytes of mCurrentRilRawData. If all were written, continue.
// //
// Otherwise, save the byte position of the next byte to write // Otherwise, save the byte position of the next byte to write
// within mCurrentRilRawData, and request another write when the // within mCurrentRilRawData, and request another write when the
// system won't block. // system won't block.
// //
while (true) { while (true) {
UnixSocketRawData* data; UnixSocketRawData* data;
if (mOutgoingQ.IsEmpty()) { if (mOutgoingQ.IsEmpty()) {
return;
}
data = mOutgoingQ.ElementAt(0);
const uint8_t *toWrite;
toWrite = data->mData;
while (data->mCurrentWriteOffset < data->mSize) {
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset;
ssize_t written;
written = write (aFd, toWrite + data->mCurrentWriteOffset,
write_amount);
if (written > 0) {
data->mCurrentWriteOffset += written;
}
if (written != write_amount) {
break;
}
}
if (data->mCurrentWriteOffset != data->mSize) {
MessageLoopForIO::current()->WatchFileDescriptor(
aFd,
false,
MessageLoopForIO::WATCH_WRITE,
&mWriteWatcher,
this);
return;
}
mOutgoingQ.RemoveElementAt(0);
delete data;
}
} else if (status == SOCKET_CONNECTING) {
int error, ret;
socklen_t len = sizeof(error);
ret = getsockopt(mFd.get(), SOL_SOCKET, SO_ERROR, &error, &len);
if (ret || error) {
NS_WARNING("getsockopt failure on async socket connect!");
nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
NS_DispatchToMainThread(t);
return; return;
} }
data = mOutgoingQ.ElementAt(0);
const uint8_t *toWrite;
toWrite = data->mData;
while (data->mCurrentWriteOffset < data->mSize) { if (!mConnector->SetUp(mFd)) {
ssize_t write_amount = data->mSize - data->mCurrentWriteOffset; NS_WARNING("Could not set up socket!");
ssize_t written; nsRefPtr<OnSocketEventTask> t =
written = write (aFd, toWrite + data->mCurrentWriteOffset, new OnSocketEventTask(this, OnSocketEventTask::CONNECT_ERROR);
write_amount); NS_DispatchToMainThread(t);
if (written > 0) {
data->mCurrentWriteOffset += written;
}
if (written != write_amount) {
break;
}
}
if (data->mCurrentWriteOffset != data->mSize) {
MessageLoopForIO::current()->WatchFileDescriptor(
aFd,
false,
MessageLoopForIO::WATCH_WRITE,
&mWriteWatcher,
this);
return; return;
} }
mOutgoingQ.RemoveElementAt(0);
delete data; nsRefPtr<OnSocketEventTask> t =
new OnSocketEventTask(this, OnSocketEventTask::CONNECT_SUCCESS);
NS_DispatchToMainThread(t);
SetUpIO();
} }
} }
void void