brings IPC service up-to-date with latest necko changes (NOT PART OF THE BUILD)

This commit is contained in:
darin%netscape.com 2003-01-31 00:40:35 +00:00
Родитель dbaeb4debd
Коммит 3f3ed4a1cf
5 изменённых файлов: 126 добавлений и 277 удалений

Просмотреть файл

@ -50,7 +50,6 @@ MODULE_NAME = ipc
REQUIRES = xpcom \
string \
necko \
ipc \
$(NULL)
CPPSRCS = ipcModule.cpp

Просмотреть файл

@ -39,8 +39,6 @@
#define ipcService_h__
#include "nsIRequest.h"
#include "nsIStreamListener.h"
#include "nsIStreamProvider.h"
#include "nsCOMPtr.h"
#include "nsHashtable.h"

Просмотреть файл

@ -39,7 +39,6 @@
#define ipcTransport_h__
#include "nsIObserver.h"
#include "nsITransport.h"
#include "nsITimer.h"
#include "nsString.h"
#include "nsCOMPtr.h"
@ -48,7 +47,6 @@
#include "ipcMessageQ.h"
#ifdef XP_UNIX
#include "prio.h"
#include "ipcTransportUnix.h"
#endif
@ -81,10 +79,8 @@ public:
, mSpawnedDaemon(PR_FALSE)
, mConnectionAttemptCount(0)
#ifdef XP_UNIX
, mSendQ(this)
, mReceiver(this)
, mFD(nsnull)
, mWriteSuspended(PR_FALSE)
#endif
{ NS_INIT_ISUPPORTS(); }
@ -125,14 +121,12 @@ private:
PRUint8 mConnectionAttemptCount;
#ifdef XP_UNIX
ipcSendQueue mSendQ;
ipcReceiver mReceiver;
nsCOMPtr<nsITransport> mTransport;
nsCOMPtr<nsIRequest> mReadRequest;
nsCOMPtr<nsIRequest> mWriteRequest;
nsCString mSocketPath;
PRFileDesc *mFD;
PRPackedBool mWriteSuspended;
ipcReceiver mReceiver;
nsCOMPtr<nsISocketTransport> mTransport;
nsCOMPtr<nsIInputStream> mInputStream;
nsCOMPtr<nsIOutputStream> mOutputStream;
nsCString mSocketPath;
PRFileDesc *mFD;
//
// unix specific helpers
@ -143,11 +137,9 @@ private:
public:
//
// internal helper methods for ipcSendQueue and ipcReceiver
// internal helper methods
//
void SetWriteSuspended(PRBool val) { mWriteSuspended = val; }
void OnStartRequest(nsIRequest *req);
void OnStopRequest(nsIRequest *req, nsresult status);
void OnConnectionLost(nsresult reason);
PRFileDesc *FD() { return mFD; }
#endif
};

Просмотреть файл

@ -43,9 +43,9 @@
#include "nsDirectoryServiceDefs.h"
#include "ipcSocketProviderUnix.h"
#include "nsISocketTransportService.h"
#include "nsIInputStream.h"
#include "nsIOutputStream.h"
#include "netCore.h"
#include "nsEventQueueUtils.h"
#include "nsNetCID.h"
#include "nsNetError.h"
#include "nsCOMPtr.h"
#include "ipcConfig.h"
@ -56,6 +56,8 @@
static NS_DEFINE_CID(kSocketTransportServiceCID, NS_SOCKETTRANSPORTSERVICE_CID);
#define IPC_BUFFER_SEGMENT_SIZE 4096
//-----------------------------------------------------------------------------
// ipcTransport (XP_UNIX specific methods)
//-----------------------------------------------------------------------------
@ -77,16 +79,32 @@ ipcTransport::Shutdown()
mHaveConnection = PR_FALSE;
if (mReadRequest) {
mReadRequest->Cancel(NS_BINDING_ABORTED);
mReadRequest = nsnull;
if (mTransport) {
mTransport->Close(NS_BINDING_ABORTED);
mTransport = nsnull;
mInputStream = nsnull;
mOutputStream = nsnull;
}
if (mWriteRequest) {
mWriteRequest->Cancel(NS_BINDING_ABORTED);
mWriteRequest = nsnull;
mWriteSuspended = PR_FALSE;
}
mTransport = nsnull;
return NS_OK;
}
static NS_METHOD ipcWriteMessage(nsIOutputStream *stream,
void *closure,
char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countWritten)
{
ipcMessage *msg = (ipcMessage *) closure;
PRBool complete;
PRStatus rv = msg->WriteTo(segment, count, countWritten, &complete);
NS_ASSERTION(rv == PR_SUCCESS, "failed to write message");
// stop writing once the message has been completely written.
if (*countWritten == 0 && complete)
return NS_BASE_STREAM_CLOSED;
return NS_OK;
}
@ -95,20 +113,17 @@ ipcTransport::SendMsg_Internal(ipcMessage *msg)
{
LOG(("ipcTransport::SendMsg_Internal [dataLen=%u]\n", msg->DataLen()));
mSendQ.EnqueueMsg(msg);
NS_ENSURE_TRUE(mOutputStream, NS_ERROR_NOT_INITIALIZED);
if (!mWriteRequest) {
if (!mTransport)
return NS_ERROR_FAILURE;
nsresult rv;
PRUint32 n;
nsresult rv = mTransport->AsyncWrite(&mSendQ, nsnull, 0, PRUint32(-1), 0,
getter_AddRefs(mWriteRequest));
if (NS_FAILED(rv)) return rv;
}
if (mWriteSuspended) {
mWriteRequest->Resume();
mWriteSuspended = PR_FALSE;
}
rv = mOutputStream->WriteSegments(ipcWriteMessage, msg, msg->MsgLen(), &n);
if (NS_FAILED(rv)) return rv;
NS_ASSERTION(n == msg->MsgLen(), "not all bytes written");
delete msg; // done with message
return NS_OK;
}
@ -127,67 +142,66 @@ ipcTransport::Connect()
rv = CreateTransport();
if (NS_FAILED(rv)) return rv;
rv = mTransport->AsyncRead(&mReceiver, nsnull, 0, PRUint32(-1), 0,
getter_AddRefs(mReadRequest));
return rv;
//
// send CLIENT_HELLO; expect CLIENT_ID in response.
//
rv = SendMsg_Internal(new ipcmMessageClientHello());
if (NS_FAILED(rv)) return rv;
//
// put the receiver to work...
//
nsCOMPtr<nsIAsyncInputStream> asyncIn = do_QueryInterface(mInputStream, &rv);
if (NS_FAILED(rv)) return rv;
nsCOMPtr<nsIEventQueue> eventQ;
rv = NS_GetCurrentEventQ(getter_AddRefs(eventQ));
if (NS_FAILED(rv)) return rv;
return asyncIn->AsyncWait(&mReceiver, 0, eventQ);
}
void
ipcTransport::OnStartRequest(nsIRequest *req)
ipcTransport::OnConnectionLost(nsresult reason)
{
nsresult status;
req->GetStatus(&status);
LOG(("ipcTransport::OnConnectionLost [reason=%x]\n", reason));
if (NS_SUCCEEDED(status) && !mHaveConnection && !mSentHello) {
//
// send CLIENT_HELLO; expect CLIENT_ID in response.
//
SendMsg_Internal(new ipcmMessageClientHello());
mSentHello = PR_TRUE;
}
}
PRBool hadConnection = mHaveConnection;
Shutdown();
void
ipcTransport::OnStopRequest(nsIRequest *req, nsresult status)
{
LOG(("ipcTransport::OnStopRequest [status=%x]\n", status));
if (mObserver && hadConnection)
mObserver->OnConnectionLost();
if (mHaveConnection) {
mHaveConnection = PR_FALSE;
if (mObserver)
mObserver->OnConnectionLost();
}
if (status == NS_BINDING_ABORTED)
if (reason == NS_BINDING_ABORTED)
return;
if (NS_FAILED(status) && NS_FAILED(OnConnectFailure()))
return;
if (req == mReadRequest)
mReadRequest = nsnull;
else if (req == mWriteRequest) {
mWriteRequest = nsnull;
mWriteSuspended = PR_FALSE;
}
if (NS_FAILED(reason))
OnConnectFailure();
}
nsresult
ipcTransport::CreateTransport()
{
nsresult rv;
nsCOMPtr<nsISocketTransportService> sts(
do_GetService(kSocketTransportServiceCID, &rv));
if (NS_FAILED(rv)) return rv;
rv = sts->CreateTransportOfType(IPC_SOCKET_TYPE,
"127.0.0.1",
IPC_PORT,
nsnull,
1024,
1024*16,
getter_AddRefs(mTransport));
const char *types[] = { IPC_SOCKET_TYPE };
rv = sts->CreateTransport(types, 1,
NS_LITERAL_CSTRING("127.0.0.1"), IPC_PORT, nsnull,
getter_AddRefs(mTransport));
if (NS_FAILED(rv)) return rv;
// open a blocking, buffered output stream (buffer size is unlimited)
rv = mTransport->OpenOutputStream(nsITransport::OPEN_BLOCKING,
IPC_BUFFER_SEGMENT_SIZE, PRUint32(-1),
getter_AddRefs(mOutputStream));
if (NS_FAILED(rv)) return rv;
// open a non-blocking, buffered input stream (buffer size limited)
rv = mTransport->OpenInputStream(0, IPC_BUFFER_SEGMENT_SIZE, 4,
getter_AddRefs(mInputStream));
return rv;
}
@ -217,110 +231,6 @@ ipcTransport::GetSocketPath(nsACString &socketPath)
return NS_OK;
}
//-----------------------------------------------------------------------------
// ipcSendQueue
//-----------------------------------------------------------------------------
NS_IMETHODIMP_(nsrefcnt)
ipcSendQueue::AddRef()
{
return mTransport->AddRef();
}
NS_IMETHODIMP_(nsrefcnt)
ipcSendQueue::Release()
{
return mTransport->Release();
}
NS_IMPL_QUERY_INTERFACE2(ipcSendQueue, nsIStreamProvider, nsIRequestObserver)
NS_IMETHODIMP
ipcSendQueue::OnStartRequest(nsIRequest *request,
nsISupports *context)
{
LOG(("ipcSendQueue::OnStartRequest\n"));
if (mTransport)
mTransport->OnStartRequest(request);
return NS_OK;
}
NS_IMETHODIMP
ipcSendQueue::OnStopRequest(nsIRequest *request,
nsISupports *context,
nsresult status)
{
LOG(("ipcSendQueue::OnStopRequest [status=%x]\n", status));
if (mTransport)
mTransport->OnStopRequest(request, status);
return NS_OK;
}
struct ipcWriteState
{
ipcMessage *msg;
PRBool complete;
};
static NS_METHOD ipcWriteMessage(nsIOutputStream *stream,
void *closure,
char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countWritten)
{
ipcWriteState *state = (ipcWriteState *) closure;
if (state->msg->WriteTo(segment, count,
countWritten, &state->complete) != PR_SUCCESS)
return NS_ERROR_UNEXPECTED;
return NS_OK;
}
NS_IMETHODIMP
ipcSendQueue::OnDataWritable(nsIRequest *request,
nsISupports *context,
nsIOutputStream *stream,
PRUint32 offset,
PRUint32 count)
{
PRUint32 n;
nsresult rv;
ipcWriteState state;
PRBool wroteSomething = PR_FALSE;
LOG(("ipcSendQueue::OnDataWritable\n"));
while (!mQueue.IsEmpty()) {
state.msg = mQueue.First();
state.complete = PR_FALSE;
rv = stream->WriteSegments(ipcWriteMessage, &state, count, &n);
if (NS_FAILED(rv))
break;
if (state.complete) {
LOG((" wrote message %u bytes\n", mQueue.First()->MsgLen()));
mQueue.DeleteFirst();
}
wroteSomething = PR_TRUE;
}
if (wroteSomething)
return NS_OK;
LOG((" suspending write request\n"));
mTransport->SetWriteSuspended(PR_TRUE);
return NS_BASE_STREAM_WOULD_BLOCK;
}
//----------------------------------------------------------------------------
// ipcReceiver
//----------------------------------------------------------------------------
@ -337,70 +247,28 @@ ipcReceiver::Release()
return mTransport->Release();
}
NS_IMPL_QUERY_INTERFACE2(ipcReceiver, nsIStreamListener, nsIRequestObserver)
NS_IMPL_QUERY_INTERFACE1(ipcReceiver, nsIInputStreamNotify)
NS_IMETHODIMP
ipcReceiver::OnStartRequest(nsIRequest *request,
nsISupports *context)
NS_METHOD
ipcReceiver::ReadSegment(nsIInputStream *stream,
void *closure,
const char *ptr,
PRUint32 offset,
PRUint32 count,
PRUint32 *countRead)
{
LOG(("ipcReceiver::OnStartRequest\n"));
ipcReceiver *self = (ipcReceiver *) closure;
if (mTransport)
mTransport->OnStartRequest(request);
return NS_OK;
}
NS_IMETHODIMP
ipcReceiver::OnStopRequest(nsIRequest *request,
nsISupports *context,
nsresult status)
{
LOG(("ipcReceiver::OnStopRequest [status=%x]\n", status));
if (mTransport)
mTransport->OnStopRequest(request, status);
return NS_OK;
}
static NS_METHOD ipcReadMessage(nsIInputStream *stream,
void *closure,
const char *segment,
PRUint32 offset,
PRUint32 count,
PRUint32 *countRead)
{
ipcReceiver *receiver = (ipcReceiver *) closure;
return receiver->ReadSegment(segment, count, countRead);
}
NS_IMETHODIMP
ipcReceiver::OnDataAvailable(nsIRequest *request,
nsISupports *context,
nsIInputStream *stream,
PRUint32 offset,
PRUint32 count)
{
LOG(("ipcReceiver::OnDataAvailable [count=%u]\n", count));
PRUint32 countRead;
return stream->ReadSegments(ipcReadMessage, this, count, &countRead);
}
nsresult
ipcReceiver::ReadSegment(const char *ptr, PRUint32 count, PRUint32 *countRead)
{
*countRead = 0;
while (count) {
PRUint32 nread;
PRBool complete;
mMsg.ReadFrom(ptr, count, &nread, &complete);
self->mMsg.ReadFrom(ptr, count, &nread, &complete);
if (complete) {
mTransport->OnMessageAvailable(&mMsg);
mMsg.Reset();
self->mTransport->OnMessageAvailable(&self->mMsg);
self->mMsg.Reset();
}
count -= nread;
@ -410,3 +278,18 @@ ipcReceiver::ReadSegment(const char *ptr, PRUint32 count, PRUint32 *countRead)
return NS_OK;
}
NS_IMETHODIMP
ipcReceiver::OnInputStreamReady(nsIAsyncInputStream *stream)
{
LOG(("ipcReceiver::OnInputStreamReady\n"));
nsresult rv;
PRUint32 n;
rv = stream->ReadSegments(ReadSegment, this, IPC_BUFFER_SEGMENT_SIZE, &n);
if (NS_FAILED(rv))
mTransport->OnConnectionLost(rv);
return NS_OK;
}

Просмотреть файл

@ -38,59 +38,36 @@
#ifndef ipcTransportUnix_h__
#define ipcTransportUnix_h__
#include "nsIStreamListener.h"
#include "nsIStreamProvider.h"
#include "nsIAsyncInputStream.h"
#include "nsIAsyncOutputStream.h"
#include "nsISocketTransport.h"
#include "prio.h"
#include "ipcMessageQ.h"
class ipcTransport;
//----------------------------------------------------------------------------
// ipcSendQueue
//----------------------------------------------------------------------------
class ipcSendQueue : public nsIStreamProvider
{
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMPROVIDER
ipcSendQueue(ipcTransport *transport)
: mTransport(transport)
{ }
virtual ~ipcSendQueue() { }
void EnqueueMsg(ipcMessage *msg) { mQueue.Append(msg); }
PRBool IsEmpty() { return mQueue.IsEmpty(); }
private:
ipcTransport *mTransport;
ipcMessageQ mQueue;
};
//-----------------------------------------------------------------------------
// ipcReceiver
//-----------------------------------------------------------------------------
class ipcReceiver : public nsIStreamListener
class ipcReceiver : public nsIInputStreamNotify
{
public:
NS_DECL_ISUPPORTS_INHERITED
NS_DECL_NSIREQUESTOBSERVER
NS_DECL_NSISTREAMLISTENER
NS_DECL_NSIINPUTSTREAMNOTIFY
ipcReceiver(ipcTransport *transport)
: mTransport(transport)
{ }
virtual ~ipcReceiver() { }
nsresult ReadSegment(const char *, PRUint32 count, PRUint32 *countRead);
private:
ipcTransport *mTransport;
ipcMessage mMsg; // message in progress
static NS_METHOD ReadSegment(nsIInputStream *, void *, const char *,
PRUint32, PRUint32, PRUint32 *);
ipcTransport *mTransport;
ipcMessage mMsg; // message in progress
};
#endif // !ipcTransportUnix_h__