зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1158876: Move management of socket I/O buffers into socket I/O classes, r=kmachulis
This patch moves management of received socket I/O buffers from |DataSocketIO| into the I/O classes. Each I/O class is responsible for (de-)allocating buffers, and consuming them once data has been received. All current I/O classes forward their buffers to the main thread, but other operations are possible. For example, received data can be parsed and processed directly in the I/O thread.
This commit is contained in:
Родитель
ed4b4e63be
Коммит
10488d5115
|
@ -75,7 +75,6 @@ public:
|
|||
|
||||
DroidSocketImpl(MessageLoop* aIOLoop, BluetoothSocket* aConsumer)
|
||||
: ipc::UnixFdWatcher(aIOLoop)
|
||||
, DataSocketIO(MAX_READ_SIZE)
|
||||
, mConsumer(aConsumer)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
, mConnectionStatus(SOCKET_IS_DISCONNECTED)
|
||||
|
@ -145,6 +144,13 @@ public:
|
|||
return GetDataSocket();
|
||||
}
|
||||
|
||||
// Methods for |DataSocket|
|
||||
//
|
||||
|
||||
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer);
|
||||
void ConsumeBuffer();
|
||||
void DiscardBuffer();
|
||||
|
||||
/**
|
||||
* Consumer pointer. Non-thread safe RefPtr, so should only be manipulated
|
||||
* directly from main thread. All non-main-thread accesses should happen with
|
||||
|
@ -180,6 +186,11 @@ private:
|
|||
bool mShuttingDownOnIOThread;
|
||||
|
||||
ConnectionStatus mConnectionStatus;
|
||||
|
||||
/**
|
||||
* I/O buffer for received data
|
||||
*/
|
||||
nsAutoPtr<UnixSocketRawData> mBuffer;
|
||||
};
|
||||
|
||||
class SocketConnectTask final : public SocketIOTask<DroidSocketImpl>
|
||||
|
@ -492,6 +503,33 @@ DroidSocketImpl::OnSocketCanConnectWithoutBlocking(int aFd)
|
|||
}
|
||||
}
|
||||
|
||||
nsresult
|
||||
DroidSocketImpl::QueryReceiveBuffer(
|
||||
UnixSocketIOBuffer** aBuffer)
|
||||
{
|
||||
MOZ_ASSERT(aBuffer);
|
||||
|
||||
if (!mBuffer) {
|
||||
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
|
||||
}
|
||||
*aBuffer = mBuffer.get();
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
void
|
||||
DroidSocketImpl::ConsumeBuffer()
|
||||
{
|
||||
NS_DispatchToMainThread(
|
||||
new SocketIOReceiveRunnable<DroidSocketImpl>(this, mBuffer.forget()));
|
||||
}
|
||||
|
||||
void
|
||||
DroidSocketImpl::DiscardBuffer()
|
||||
{
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
BluetoothSocket::BluetoothSocket(BluetoothSocketObserver* aObserver,
|
||||
BluetoothSocketType aType,
|
||||
bool aAuth,
|
||||
|
|
|
@ -80,6 +80,13 @@ public:
|
|||
void OnSocketCanReceiveWithoutBlocking() override;
|
||||
void OnSocketCanSendWithoutBlocking() override;
|
||||
|
||||
// Methods for |DataSocket|
|
||||
//
|
||||
|
||||
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer);
|
||||
void ConsumeBuffer();
|
||||
void DiscardBuffer();
|
||||
|
||||
private:
|
||||
void FireSocketError();
|
||||
|
||||
|
@ -122,6 +129,11 @@ private:
|
|||
* Task member for delayed connect task. Should only be access on main thread.
|
||||
*/
|
||||
CancelableTask* mDelayedConnectTask;
|
||||
|
||||
/**
|
||||
* I/O buffer for received data
|
||||
*/
|
||||
nsAutoPtr<UnixSocketRawData> mBuffer;
|
||||
};
|
||||
|
||||
BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
|
||||
|
@ -130,7 +142,6 @@ BluetoothSocket::BluetoothSocketIO::BluetoothSocketIO(
|
|||
UnixSocketConnector* aConnector,
|
||||
const nsACString& aAddress)
|
||||
: UnixSocketWatcher(mIOLoop)
|
||||
, DataSocketIO(MAX_READ_SIZE)
|
||||
, mConsumer(aConsumer)
|
||||
, mConnector(aConnector)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
|
@ -475,6 +486,33 @@ BluetoothSocket::BluetoothSocketIO::SetSocketFlags(int aFd)
|
|||
return true;
|
||||
}
|
||||
|
||||
nsresult
|
||||
BluetoothSocket::BluetoothSocketIO::QueryReceiveBuffer(
|
||||
UnixSocketIOBuffer** aBuffer)
|
||||
{
|
||||
MOZ_ASSERT(aBuffer);
|
||||
|
||||
if (!mBuffer) {
|
||||
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
|
||||
}
|
||||
*aBuffer = mBuffer.get();
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
void
|
||||
BluetoothSocket::BluetoothSocketIO::ConsumeBuffer()
|
||||
{
|
||||
NS_DispatchToMainThread(
|
||||
new SocketIOReceiveRunnable<BluetoothSocketIO>(this, mBuffer.forget()));
|
||||
}
|
||||
|
||||
void
|
||||
BluetoothSocket::BluetoothSocketIO::DiscardBuffer()
|
||||
{
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
//
|
||||
// Socket tasks
|
||||
//
|
||||
|
|
|
@ -34,8 +34,7 @@ DataSocketIO::HasPendingData() const
|
|||
return !mOutgoingQ.IsEmpty();
|
||||
}
|
||||
|
||||
DataSocketIO::DataSocketIO(size_t aMaxReadSize)
|
||||
: mMaxReadSize(aMaxReadSize)
|
||||
DataSocketIO::DataSocketIO()
|
||||
{ }
|
||||
|
||||
//
|
||||
|
|
|
@ -101,6 +101,37 @@ class DataSocketIO : public SocketIOBase
|
|||
public:
|
||||
virtual ~DataSocketIO();
|
||||
|
||||
/**
|
||||
* Allocates a buffer for receiving data from the socket. The method
|
||||
* shall return the buffer in the arguments. The buffer is owned by the
|
||||
* I/O class. |DataSocketIO| will never ask for more than one buffer
|
||||
* at a time, so I/O classes can handout the same buffer on each invokation
|
||||
* of this method. I/O-thread only.
|
||||
*
|
||||
* @param[out] aBuffer returns a pointer to the I/O buffer
|
||||
* @return NS_OK on success, or an error code otherwise
|
||||
*/
|
||||
virtual nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer) = 0;
|
||||
|
||||
/**
|
||||
* Marks the current socket buffer to by consumed by the I/O class. The
|
||||
* class is resonsible for releasing the buffer afterwards. I/O-thread
|
||||
* only.
|
||||
*
|
||||
* @param aIndex the socket's index
|
||||
* @param[out] aBuffer the receive buffer
|
||||
* @param[out] aSize the receive buffer's size
|
||||
*/
|
||||
virtual void ConsumeBuffer() = 0;
|
||||
|
||||
/**
|
||||
* Marks the current socket buffer to be discarded. The I/O class is
|
||||
* resonsible for releasing the buffer's memory. I/O-thread only.
|
||||
*
|
||||
* @param aIndex the socket's index
|
||||
*/
|
||||
virtual void DiscardBuffer() = 0;
|
||||
|
||||
void EnqueueData(UnixSocketIOBuffer* aBuffer);
|
||||
bool HasPendingData() const;
|
||||
|
||||
|
@ -110,17 +141,25 @@ public:
|
|||
MOZ_ASSERT(aFd >= 0);
|
||||
MOZ_ASSERT(aIO);
|
||||
|
||||
nsAutoPtr<UnixSocketRawData> incoming(
|
||||
new UnixSocketRawData(mMaxReadSize));
|
||||
UnixSocketIOBuffer* incoming;
|
||||
nsresult rv = QueryReceiveBuffer(&incoming);
|
||||
if (NS_FAILED(rv)) {
|
||||
/* an error occured */
|
||||
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
|
||||
NS_DispatchToMainThread(r);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ssize_t res = incoming->Receive(aFd);
|
||||
if (res < 0) {
|
||||
/* an I/O error occured */
|
||||
DiscardBuffer();
|
||||
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
|
||||
NS_DispatchToMainThread(r);
|
||||
return -1;
|
||||
} else if (!res) {
|
||||
/* EOF or peer shut down sending */
|
||||
DiscardBuffer();
|
||||
nsRefPtr<nsRunnable> r = new SocketIORequestClosingRunnable<T>(aIO);
|
||||
NS_DispatchToMainThread(r);
|
||||
return 0;
|
||||
|
@ -132,9 +171,7 @@ public:
|
|||
AutoSourceEvent taskTracerEvent(SourceEventType::Unixsocket);
|
||||
#endif
|
||||
|
||||
nsRefPtr<nsRunnable> r =
|
||||
new SocketIOReceiveRunnable<T>(aIO, incoming.forget());
|
||||
NS_DispatchToMainThread(r);
|
||||
ConsumeBuffer();
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -168,11 +205,9 @@ public:
|
|||
}
|
||||
|
||||
protected:
|
||||
DataSocketIO(size_t aMaxReadSize);
|
||||
DataSocketIO();
|
||||
|
||||
private:
|
||||
const size_t mMaxReadSize;
|
||||
|
||||
/**
|
||||
* Raw data queue. Must be pushed/popped from I/O thread only.
|
||||
*/
|
||||
|
|
|
@ -86,6 +86,13 @@ public:
|
|||
void OnSocketCanReceiveWithoutBlocking() override;
|
||||
void OnSocketCanSendWithoutBlocking() override;
|
||||
|
||||
// Methods for |DataSocket|
|
||||
//
|
||||
|
||||
nsresult QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer);
|
||||
void ConsumeBuffer();
|
||||
void DiscardBuffer();
|
||||
|
||||
private:
|
||||
void FireSocketError();
|
||||
|
||||
|
@ -128,6 +135,11 @@ private:
|
|||
* Task member for delayed connect task. Should only be access on main thread.
|
||||
*/
|
||||
CancelableTask* mDelayedConnectTask;
|
||||
|
||||
/**
|
||||
* I/O buffer for received data
|
||||
*/
|
||||
nsAutoPtr<UnixSocketRawData> mBuffer;
|
||||
};
|
||||
|
||||
StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
|
||||
|
@ -135,7 +147,6 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop,
|
|||
UnixSocketConnector* aConnector,
|
||||
const nsACString& aAddress)
|
||||
: UnixSocketWatcher(mIOLoop)
|
||||
, DataSocketIO(MAX_READ_SIZE)
|
||||
, mStreamSocket(aStreamSocket)
|
||||
, mConnector(aConnector)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
|
@ -152,7 +163,6 @@ StreamSocketIO::StreamSocketIO(MessageLoop* mIOLoop, int aFd,
|
|||
UnixSocketConnector* aConnector,
|
||||
const nsACString& aAddress)
|
||||
: UnixSocketWatcher(mIOLoop, aFd, aConnectionStatus)
|
||||
, DataSocketIO(MAX_READ_SIZE)
|
||||
, mStreamSocket(aStreamSocket)
|
||||
, mConnector(aConnector)
|
||||
, mShuttingDownOnIOThread(false)
|
||||
|
@ -498,6 +508,32 @@ StreamSocketIO::SetSocketFlags(int aFd)
|
|||
return true;
|
||||
}
|
||||
|
||||
nsresult
|
||||
StreamSocketIO::QueryReceiveBuffer(UnixSocketIOBuffer** aBuffer)
|
||||
{
|
||||
MOZ_ASSERT(aBuffer);
|
||||
|
||||
if (!mBuffer) {
|
||||
mBuffer = new UnixSocketRawData(MAX_READ_SIZE);
|
||||
}
|
||||
*aBuffer = mBuffer.get();
|
||||
|
||||
return NS_OK;
|
||||
}
|
||||
|
||||
void
|
||||
StreamSocketIO::ConsumeBuffer()
|
||||
{
|
||||
NS_DispatchToMainThread(
|
||||
new SocketIOReceiveRunnable<StreamSocketIO>(this, mBuffer.forget()));
|
||||
}
|
||||
|
||||
void
|
||||
StreamSocketIO::DiscardBuffer()
|
||||
{
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
//
|
||||
// Socket tasks
|
||||
//
|
||||
|
|
Загрузка…
Ссылка в новой задаче