landing patch for bug 243107 "make ipcIService and ipcILockService threadsafe" (this code is not currently used for anything)

This commit is contained in:
darin%meer.net 2004-05-09 19:08:57 +00:00
Родитель 51e61b254e
Коммит 89fdea0832
16 изменённых файлов: 623 добавлений и 315 удалений

Просмотреть файл

@ -74,17 +74,24 @@
* Initialization and Shutdown * Initialization and Shutdown
*/ */
// XXX limit these to the main thread, and call them from our module's ctor/dtor?
/** /**
* Ensures that this process is connected to the IPC daemon. If it is already * Connects this process to the IPC daemon and initializes it for use as a
* connected, then this function call has no effect. Each call to IPC_Init * client of the IPC daemon. This function must be called once before any
* should be balanced by a call to IPC_Shutdown. A reference counter is used * other methods defined in this file can be used.
* to determine when to disconnect from the IPC daemon. *
* @returns NS_ERROR_ALREADY_INITIALIZED if IPC_Shutdown was not called since
* the last time IPC_Init was called.
*/ */
IPC_METHOD IPC_Init(); IPC_METHOD IPC_Init();
/** /**
* Disconnects this process from the IPC daemon. Must be called once for * Disconnects this process from the IPC daemon. After this function is
* every call to IPC_Init when the IPC connection is no longer needed. * called, no other methods in this file except for IPC_Init may be called.
*
* @returns NS_ERROR_NOT_INITIALIZED if IPC_Init has not been called or if
* IPC_Init did not return a success code.
*/ */
IPC_METHOD IPC_Shutdown(); IPC_METHOD IPC_Shutdown();
@ -99,16 +106,40 @@ IPC_METHOD IPC_Shutdown();
* whenever a message is sent to this target in this process. * whenever a message is sent to this target in this process.
* *
* This function has three main effects: * This function has three main effects:
* o If the message target is already defined, then this function simply resets * o If the message target is already defined, then this function simply
* its message observer. * resets its message observer.
* o If the message target is not already defined, then the IPC daemon will be * o If the message target is not already defined, then the message target
* notified of the existance of this message target. * is defined and the IPC daemon is notified of the existance of this
* message target.
* o If null is passed for the message observer, then the message target is * o If null is passed for the message observer, then the message target is
* removed, and the daemon is notified of the removal of this message target. * removed, and the daemon is notified of the removal of this message target.
*
* If aOnCurrentThread is true, then notifications to the observer will occur
* on the current thread. This means that there must be a nsIEventTarget
* associated with the calling thread. If aOnCurrentThread is false, then
* notifications to the observer will occur on a background thread. In which
* case, the observer must be threadsafe.
*/ */
IPC_METHOD IPC_DefineTarget( IPC_METHOD IPC_DefineTarget(
const nsID &aTarget, const nsID &aTarget,
ipcIMessageObserver *aObserver ipcIMessageObserver *aObserver,
PRBool aOnCurrentThread = PR_TRUE
);
/**
* Call this method to temporarily disable the message observer configured
* for a message target.
*/
IPC_METHOD IPC_DisableMessageObserver(
const nsID &aTarget
);
/**
* Call this method to re-enable the message observer configured for a
* message target that was disabled by a call to IPC_DisableMessageObserver.
*/
IPC_METHOD IPC_EnableMessageObserver(
const nsID &aTarget
); );
/** /**
@ -135,9 +166,9 @@ IPC_METHOD IPC_SendMessage(
* the IPC daemon. * the IPC daemon.
* o If aSenderID is IPC_SENDER_ANY, then this function waits for a message * o If aSenderID is IPC_SENDER_ANY, then this function waits for a message
* to be sent from any source. * to be sent from any source.
* o Otherwise, this function waits for a message to be sent by client with * o Otherwise, this function waits for a message to be sent by the client
* ID given by aSenderID. If aSenderID does not identify a valid client, * with ID given by aSenderID. If aSenderID does not identify a valid
* then this function will return an error. * client, then this function will return an error.
* *
* The aObserver parameter is interpreted as follows: * The aObserver parameter is interpreted as follows:
* o If aObserver is null, then the default message observer for the target * o If aObserver is null, then the default message observer for the target
@ -157,7 +188,8 @@ IPC_METHOD IPC_SendMessage(
* If aObserver's OnMessageAvailable function returns IPC_WAIT_NEXT_MESSAGE, * If aObserver's OnMessageAvailable function returns IPC_WAIT_NEXT_MESSAGE,
* then the function will continue blocking until the next matching message * then the function will continue blocking until the next matching message
* is received. Bypassed messages will be dispatched to the default message * is received. Bypassed messages will be dispatched to the default message
* observer when the thread's event queue is processed. * observer when the event queue, associated with the thread that called
* IPC_DefineTarget, is processed.
* *
* This function runs the risk of hanging the calling thread indefinitely if * This function runs the risk of hanging the calling thread indefinitely if
* no matching message is ever received. * no matching message is ever received.
@ -195,7 +227,7 @@ IPC_METHOD IPC_RemoveName(
); );
/** /**
* Adds client observer. * Adds client observer. Will be called on the main thread.
*/ */
IPC_METHOD IPC_AddClientObserver( IPC_METHOD IPC_AddClientObserver(
ipcIClientObserver *aObserver ipcIClientObserver *aObserver
@ -225,4 +257,31 @@ IPC_METHOD IPC_ClientExists(
PRBool *aResult PRBool *aResult
); );
/*****************************************************************************/
/**
* This class can be used to temporarily disable the default message observer
* defined for a particular message target.
*/
class ipcDisableMessageObserverForScope
{
public:
ipcDisableMessageObserverForScope(const nsID &aTarget)
: mTarget(aTarget)
{
IPC_DisableMessageObserver(mTarget);
}
~ipcDisableMessageObserverForScope()
{
IPC_EnableMessageObserver(mTarget);
}
private:
const nsID &mTarget;
};
#define IPC_DISABLE_MESSAGE_OBSERVER_FOR_SCOPE(_t) \
ipcDisableMessageObserverForScope ipc_dmo_for_scope##_t(_t)
#endif /* ipcdclient_h__ */ #endif /* ipcdclient_h__ */

Просмотреть файл

@ -49,6 +49,8 @@ class ipcMessage;
/* Platform specific IPC connection API. /* Platform specific IPC connection API.
*/ */
typedef void (* ipcCallbackFunc)(void *);
/** /**
* IPC_Connect * IPC_Connect
* *
@ -87,6 +89,20 @@ IPC_METHOD_PRIVATE IPC_Disconnect();
*/ */
IPC_METHOD_PRIVATE IPC_SendMsg(ipcMessage *msg); IPC_METHOD_PRIVATE IPC_SendMsg(ipcMessage *msg);
/**
* IPC_DoCallback
*
* This function executes a callback function on the same background thread
* that calls IPC_OnConnectionEnd and IPC_OnMessageAvailable.
*
* If this function succeeds, then the caller is guaranteed that |func| will
* be called. This guarantee is important because it allows the caller to
* free any memory associated with |arg| once |func| has been called.
*
* NOTE: This function may be called on any thread.
*/
IPC_METHOD_PRIVATE IPC_DoCallback(ipcCallbackFunc func, void *arg);
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* Cross-platform IPC connection methods. /* Cross-platform IPC connection methods.
*/ */
@ -95,9 +111,8 @@ IPC_METHOD_PRIVATE IPC_SendMsg(ipcMessage *msg);
* IPC_SpawnDaemon * IPC_SpawnDaemon
* *
* This function launches the IPC daemon process. It is called by the platform * This function launches the IPC daemon process. It is called by the platform
* specific IPC_Connect implementation. This function may be called on any * specific IPC_Connect implementation. It should not return until the daemon
* thread. It should not return until the daemon process is ready to receive * process is ready to receive a client connection or an error occurs.
* a client connection or an error occurs.
* *
* @param daemonPath * @param daemonPath
* Specifies the path to the IPC daemon executable. * Specifies the path to the IPC daemon executable.

Просмотреть файл

@ -115,10 +115,21 @@ DoSecurityCheck(PRFileDesc *fd, const char *path)
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
struct ipcCallback : public ipcListNode<ipcCallback>
{
ipcCallbackFunc func;
void *arg;
};
typedef ipcList<ipcCallback> ipcCallbackQ;
//-----------------------------------------------------------------------------
struct ipcConnectionState struct ipcConnectionState
{ {
PRLock *lock; PRLock *lock;
PRPollDesc fds[2]; PRPollDesc fds[2];
ipcCallbackQ callback_queue;
ipcMessageQ send_queue; ipcMessageQ send_queue;
PRUint32 send_offset; // amount of send_queue.First() already written. PRUint32 send_offset; // amount of send_queue.First() already written.
ipcMessage *in_msg; ipcMessage *in_msg;
@ -315,17 +326,38 @@ ConnThread(void *arg)
num = PR_Poll(s->fds, 2, PR_INTERVAL_NO_TIMEOUT); num = PR_Poll(s->fds, 2, PR_INTERVAL_NO_TIMEOUT);
if (num > 0) if (num > 0)
{ {
ipcCallbackQ cbs_to_run;
// check if something has been added to the send queue. if so, then // check if something has been added to the send queue. if so, then
// acknowledge pollable event (wait should not block), and configure // acknowledge pollable event (wait should not block), and configure
// poll flags to find out when we can write. // poll flags to find out when we can write.
//
// delay processing a shutdown request until after all queued up
// messages have been sent and until after all queued up callbacks
// have been run.
if (s->fds[POLL].out_flags & PR_POLL_READ) if (s->fds[POLL].out_flags & PR_POLL_READ)
{ {
PR_WaitForPollableEvent(s->fds[POLL].fd); PR_WaitForPollableEvent(s->fds[POLL].fd);
PR_Lock(s->lock); PR_Lock(s->lock);
PRBool delayShutdown = PR_FALSE;
if (!s->send_queue.IsEmpty()) if (!s->send_queue.IsEmpty())
{
delayShutdown = PR_TRUE;
s->fds[SOCK].in_flags |= PR_POLL_WRITE; s->fds[SOCK].in_flags |= PR_POLL_WRITE;
else if (s->shutdown) }
if (!s->callback_queue.IsEmpty())
{
delayShutdown = PR_TRUE;
s->callback_queue.MoveTo(cbs_to_run);
}
if (!delayShutdown && s->shutdown)
rv = NS_ERROR_ABORT; rv = NS_ERROR_ABORT;
PR_Unlock(s->lock); PR_Unlock(s->lock);
} }
@ -336,6 +368,14 @@ ConnThread(void *arg)
// check if we can write... // check if we can write...
if (s->fds[SOCK].out_flags & PR_POLL_WRITE) if (s->fds[SOCK].out_flags & PR_POLL_WRITE)
rv = ConnWrite(s); rv = ConnWrite(s);
// check if we have callbacks to run
while (!cbs_to_run.IsEmpty())
{
ipcCallback *cb = cbs_to_run.First();
(cb->func)(cb->arg);
cbs_to_run.DeleteFirst();
}
} }
else else
{ {
@ -510,6 +550,25 @@ IPC_SendMsg(ipcMessage *msg)
return NS_OK; return NS_OK;
} }
nsresult
IPC_DoCallback(ipcCallbackFunc func, void *arg)
{
if (!gConnState || !gConnThread)
return NS_ERROR_NOT_INITIALIZED;
ipcCallback *callback = new ipcCallback;
if (!callback)
return NS_ERROR_OUT_OF_MEMORY;
callback->func = func;
callback->arg = arg;
PR_Lock(gConnState->lock);
gConnState->callback_queue.Append(callback);
PR_SetPollableEvent(gConnState->fds[POLL].fd);
PR_Unlock(gConnState->lock);
return NS_OK;
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
#ifdef TEST_STANDALONE #ifdef TEST_STANDALONE

Просмотреть файл

@ -67,7 +67,8 @@
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
#define IPC_WM_SENDMSG (WM_USER + 0x1) #define IPC_WM_SENDMSG (WM_USER + 0x1)
#define IPC_WM_SHUTDOWN (WM_USER + 0x2) #define IPC_WM_CALLBACK (WM_USER + 0x2)
#define IPC_WM_SHUTDOWN (WM_USER + 0x3)
static nsresult ipcThreadStatus = NS_OK; static nsresult ipcThreadStatus = NS_OK;
static PRThread *ipcThread = NULL; static PRThread *ipcThread = NULL;
@ -118,6 +119,13 @@ ipcThreadWindowProc(HWND hWnd, UINT uMsg, WPARAM wParam, LPARAM lParam)
return 0; return 0;
} }
if (uMsg == IPC_WM_CALLBACK) {
ipcCallbackFunc func = (ipcCallbackFunc) wParam;
void *arg = (void *) lParam;
(func)(arg);
return 0;
}
if (uMsg == IPC_WM_SHUTDOWN) { if (uMsg == IPC_WM_SHUTDOWN) {
IPC_OnConnectionEnd(NS_OK); IPC_OnConnectionEnd(NS_OK);
PostQuitMessage(0); PostQuitMessage(0);
@ -306,3 +314,19 @@ loser:
delete msg; delete msg;
return NS_ERROR_FAILURE; return NS_ERROR_FAILURE;
} }
nsresult
IPC_DoCallback(ipcCallbackFunc func, void *arg)
{
LOG(("IPC_DoCallback\n"));
if (ipcShutdown) {
LOG(("unable to send message b/c message thread is shutdown\n"));
return NS_ERROR_FAILURE;
}
if (!PostMessage(ipcLocalHwnd, IPC_WM_CALLBACK, (WPARAM) func, (LPARAM) arg)) {
LOG((" PostMessage failed w/ error = %u\n", GetLastError()));
return NS_ERROR_FAILURE;
}
return NS_OK;
}

Просмотреть файл

@ -38,17 +38,17 @@
#include "nsIServiceManager.h" #include "nsIServiceManager.h"
#include "nsIGenericFactory.h" #include "nsIGenericFactory.h"
#include "nsICategoryManager.h" #include "nsICategoryManager.h"
#include "ipcdclient.h"
#include "ipcService.h" #include "ipcService.h"
#include "ipcConfig.h" #include "ipcConfig.h"
#include "ipcCID.h" #include "ipcCID.h"
#include "ipcLockCID.h"
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Define the contructor function for the objects // Define the contructor function for the objects
// //
// NOTE: This creates an instance of objects by using the default constructor // NOTE: This creates an instance of objects by using the default constructor
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(ipcService, Init) NS_GENERIC_FACTORY_CONSTRUCTOR(ipcService)
// enable this code to make the IPC service auto-start. // enable this code to make the IPC service auto-start.
#if 0 #if 0
@ -90,6 +90,7 @@ ipcServiceUnregisterProc(nsIComponentManager *aCompMgr,
// extensions // extensions
#include "ipcLockService.h" #include "ipcLockService.h"
#include "ipcLockCID.h"
NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(ipcLockService, Init) NS_GENERIC_FACTORY_CONSTRUCTOR_INIT(ipcLockService, Init)
#include "tmTransactionService.h" #include "tmTransactionService.h"
@ -167,8 +168,24 @@ static const nsModuleComponentInfo components[] = {
#endif #endif
}; };
//-----------------------------------------------------------------------------
PR_STATIC_CALLBACK(nsresult)
ipcdclient_init(nsIModule *module)
{
return IPC_Init();
}
PR_STATIC_CALLBACK(void)
ipcdclient_shutdown(nsIModule *module)
{
IPC_Shutdown();
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Implement the NSGetModule() exported function for your module // Implement the NSGetModule() exported function for your module
// and the entire implementation of the module object. // and the entire implementation of the module object.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
NS_IMPL_NSGETMODULE(ipcd, components) NS_IMPL_NSGETMODULE_WITH_CTOR_DTOR(ipcdclient, components,
ipcdclient_init,
ipcdclient_shutdown)

Просмотреть файл

@ -37,7 +37,10 @@
#include "ipcService.h" #include "ipcService.h"
NS_IMPL_ISUPPORTS1(ipcService, ipcIService) // The ipcService implementation is nothing more than a thin XPCOM wrapper
// around the ipcdclient.h API.
NS_IMPL_THREADSAFE_ISUPPORTS1(ipcService, ipcIService)
NS_IMETHODIMP NS_IMETHODIMP
ipcService::GetID(PRUint32 *aID) ipcService::GetID(PRUint32 *aID)

Просмотреть файл

@ -46,11 +46,6 @@ class ipcService : public ipcIService
public: public:
NS_DECL_ISUPPORTS NS_DECL_ISUPPORTS
NS_DECL_IPCISERVICE NS_DECL_IPCISERVICE
NS_HIDDEN_(nsresult) Init() { return IPC_Init(); }
private:
~ipcService() { IPC_Shutdown(); }
}; };
#endif // !defined( ipcService_h__ ) #endif // !defined( ipcService_h__ )

Просмотреть файл

@ -56,7 +56,6 @@
#include "prio.h" #include "prio.h"
#include "prproces.h" #include "prproces.h"
#include "prlock.h"
#include "pratom.h" #include "pratom.h"
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
@ -68,25 +67,36 @@
class ipcTargetData class ipcTargetData
{ {
public: public:
static NS_HIDDEN_(ipcTargetData*) Create(ipcIMessageObserver *aObserver); static NS_HIDDEN_(ipcTargetData*) Create();
// threadsafe addref/release // threadsafe addref/release
NS_HIDDEN_(nsrefcnt) AddRef() { return PR_AtomicIncrement(&refcnt); } NS_HIDDEN_(nsrefcnt) AddRef() { return PR_AtomicIncrement(&refcnt); }
NS_HIDDEN_(nsrefcnt) Release() { PRInt32 r = PR_AtomicDecrement(&refcnt); if (r == 0) delete this; return r; } NS_HIDDEN_(nsrefcnt) Release() { PRInt32 r = PR_AtomicDecrement(&refcnt); if (r == 0) delete this; return r; }
NS_HIDDEN_(void) SetObserver(ipcIMessageObserver *aObserver, PRBool aOnCurrentThread);
// protects access to the members of this class // protects access to the members of this class
PRMonitor *monitor; PRMonitor *monitor;
// this may be null // this may be null
nsCOMPtr<ipcIMessageObserver> observer; nsCOMPtr<ipcIMessageObserver> observer;
// the message observer is called via this event queue
nsCOMPtr<nsIEventQueue> eventQ;
// incoming messages are added to this list // incoming messages are added to this list
ipcMessageQ pendingQ; ipcMessageQ pendingQ;
// non-zero if the observer has been disabled (this means that new messages
// should not be dispatched to the observer until the observer is re-enabled
// via IPC_EnableMessageObserver).
PRInt32 observerDisabled;
private: private:
ipcTargetData() ipcTargetData()
: monitor(PR_NewMonitor()) : monitor(PR_NewMonitor())
, observerDisabled(0)
, refcnt(0) , refcnt(0)
{} {}
@ -100,7 +110,7 @@ private:
}; };
ipcTargetData * ipcTargetData *
ipcTargetData::Create(ipcIMessageObserver *aObserver) ipcTargetData::Create()
{ {
ipcTargetData *td = new ipcTargetData; ipcTargetData *td = new ipcTargetData;
if (!td) if (!td)
@ -111,11 +121,20 @@ ipcTargetData::Create(ipcIMessageObserver *aObserver)
delete td; delete td;
return NULL; return NULL;
} }
td->observer = aObserver;
return td; return td;
} }
void
ipcTargetData::SetObserver(ipcIMessageObserver *aObserver, PRBool aOnCurrentThread)
{
observer = aObserver;
if (aOnCurrentThread)
NS_GetCurrentEventQ(getter_AddRefs(eventQ));
else
eventQ = nsnull;
}
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
typedef nsRefPtrHashtable<nsIDHashKey, ipcTargetData> ipcTargetMap; typedef nsRefPtrHashtable<nsIDHashKey, ipcTargetData> ipcTargetMap;
@ -127,12 +146,19 @@ public:
~ipcClientState() ~ipcClientState()
{ {
if (lock) if (monitor)
PR_DestroyLock(lock); PR_DestroyMonitor(monitor);
} }
// this lock protects the targetMap and the connected flag. //
PRLock *lock; // the monitor protects the targetMap and the connected flag.
//
// NOTE: we use a PRMonitor for this instead of a PRLock because we need
// the lock to be re-entrant. since we don't ever need to wait on
// this monitor, it might be worth it to implement a re-entrant
// wrapper for PRLock.
//
PRMonitor *monitor;
ipcTargetMap targetMap; ipcTargetMap targetMap;
PRBool connected; PRBool connected;
@ -144,7 +170,7 @@ public:
private: private:
ipcClientState() ipcClientState()
: lock(PR_NewLock()) : monitor(PR_NewMonitor())
, connected(PR_FALSE) , connected(PR_FALSE)
, selfID(0) , selfID(0)
{} {}
@ -157,7 +183,7 @@ ipcClientState::Create()
if (!cs) if (!cs)
return NULL; return NULL;
if (!cs->lock || !cs->targetMap.Init()) if (!cs->monitor || !cs->targetMap.Init())
{ {
delete cs; delete cs;
return NULL; return NULL;
@ -168,27 +194,26 @@ ipcClientState::Create()
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
static PRInt32 gInitCount;
static ipcClientState *gClientState; static ipcClientState *gClientState;
static PRBool static PRBool
GetTarget(const nsID &aTarget, ipcTargetData **td) GetTarget(const nsID &aTarget, ipcTargetData **td)
{ {
nsAutoLock lock(gClientState->lock); nsAutoMonitor mon(gClientState->monitor);
return gClientState->targetMap.Get(nsIDHashKey(&aTarget).GetKey(), td); return gClientState->targetMap.Get(nsIDHashKey(&aTarget).GetKey(), td);
} }
static PRBool static PRBool
PutTarget(const nsID &aTarget, ipcTargetData *td) PutTarget(const nsID &aTarget, ipcTargetData *td)
{ {
nsAutoLock lock(gClientState->lock); nsAutoMonitor mon(gClientState->monitor);
return gClientState->targetMap.Put(nsIDHashKey(&aTarget).GetKey(), td); return gClientState->targetMap.Put(nsIDHashKey(&aTarget).GetKey(), td);
} }
static void static void
DelTarget(const nsID &aTarget) DelTarget(const nsID &aTarget)
{ {
nsAutoLock lock(gClientState->lock); nsAutoMonitor mon(gClientState->monitor);
gClientState->targetMap.Remove(nsIDHashKey(&aTarget).GetKey()); gClientState->targetMap.Remove(nsIDHashKey(&aTarget).GetKey());
} }
@ -222,6 +247,11 @@ ProcessPendingQ(const nsID &aTarget)
if (GetTarget(aTarget, getter_AddRefs(td))) if (GetTarget(aTarget, getter_AddRefs(td)))
{ {
nsAutoMonitor mon(td->monitor); nsAutoMonitor mon(td->monitor);
// if the observer for this target has been temporarily disabled, then
// we must not processing any pending messages at this time.
if (!td->observerDisabled)
td->pendingQ.MoveTo(tempQ); td->pendingQ.MoveTo(tempQ);
} }
@ -249,14 +279,17 @@ ProcessPendingQ(const nsID &aTarget)
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
// WaitTarget enables support for multiple threads blocking on the same // WaitTarget enables support for multiple threads blocking on the same
// message target. This functionality does not need to be exposed in our // message target. the selector is called while inside the target's monitor.
// public API because targets are meant to be single threaded. This
// functionality only exists to support the IPCM protocol.
typedef PRBool (* ipcMessageSelector)(void *aArg, ipcTargetData *aTD, const ipcMessage *aMsg); typedef PRBool (* ipcMessageSelector)(
void *arg,
ipcTargetData *td,
const ipcMessage *msg
);
// selects any
static PRBool static PRBool
DefaultSelector(void *aArg, ipcTargetData *aTD, const ipcMessage *aMsg) DefaultSelector(void *arg, ipcTargetData *td, const ipcMessage *msg)
{ {
return PR_TRUE; return PR_TRUE;
} }
@ -299,66 +332,50 @@ WaitTarget(const nsID &aTarget,
while (gClientState->connected) while (gClientState->connected)
{ {
if (!lastChecked) NS_ASSERTION(!lastChecked, "oops");
{
if (beforeLastChecked) if (beforeLastChecked)
{
// verify that beforeLastChecked is still in the queue since it might
// have been removed while we were asleep on the monitor. we must not
// dereference it until we have verified this.
PRBool isValid = PR_FALSE;
for (ipcMessage *iter = td->pendingQ.First(); iter; iter=iter->mNext)
{
if (iter == beforeLastChecked)
{
isValid = PR_TRUE;
break;
}
}
if (isValid)
lastChecked = beforeLastChecked->mNext; lastChecked = beforeLastChecked->mNext;
else else
{
lastChecked = td->pendingQ.First(); lastChecked = td->pendingQ.First();
beforeLastChecked = nsnull;
} }
else if (lastChecked->mNext) }
lastChecked = lastChecked->mNext; else
lastChecked = td->pendingQ.First();
// loop over pending queue until we find a message that our selector likes.
while (lastChecked) while (lastChecked)
{ {
// remove this message from the pending queue. we'll put it back if if ((aSelector)(aArg, td, lastChecked))
// it is not selected. we need to do this to allow the selector {
// function to make calls back into our code. for example, it might // remove from pending queue
// try to undefine this message target!
if (beforeLastChecked) if (beforeLastChecked)
td->pendingQ.RemoveAfter(beforeLastChecked); td->pendingQ.RemoveAfter(beforeLastChecked);
else else
td->pendingQ.RemoveFirst(); td->pendingQ.RemoveFirst();
lastChecked->mNext = nsnull; lastChecked->mNext = nsnull;
mon.Exit();
PRBool selected = (aSelector)(aArg, td, lastChecked);
mon.Enter();
if (selected)
{
*aMsg = lastChecked; *aMsg = lastChecked;
break; break;
} }
// re-insert message into the pending queue. the only possible change
// that could have happened to the pending queue while we were in the
// callback is the addition of more messages (added to the end of the
// queue). our beforeLastChecked "iterator" must still be valid.
#ifdef DEBUG
// scan td->pendingQ to ensure that beforeLastChecked is still valid.
if (beforeLastChecked)
{
PRBool found = PR_FALSE;
for (ipcMessage *iter = td->pendingQ.First(); iter; iter = iter->mNext)
{
if (iter == beforeLastChecked)
{
found = PR_TRUE;
break;
}
}
NS_ASSERTION(found, "iterator is invalid");
}
#endif
if (beforeLastChecked)
td->pendingQ.InsertAfter(beforeLastChecked, lastChecked);
else
td->pendingQ.Prepend(lastChecked);
beforeLastChecked = lastChecked; beforeLastChecked = lastChecked;
lastChecked = lastChecked->mNext; lastChecked = lastChecked->mNext;
} }
@ -390,6 +407,144 @@ WaitTarget(const nsID &aTarget,
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
static void
PostEvent(nsIEventTarget *eventTarget, PLEvent *ev)
{
if (!ev)
return;
nsresult rv = eventTarget->PostEvent(ev);
if (NS_FAILED(rv))
{
NS_WARNING("PostEvent failed");
PL_DestroyEvent(ev);
}
}
static void
PostEventToMainThread(PLEvent *ev)
{
nsCOMPtr<nsIEventQueue> eventQ;
NS_GetMainEventQ(getter_AddRefs(eventQ));
if (!eventQ)
{
NS_WARNING("unable to get reference to main event queue");
PL_DestroyEvent(ev);
return;
}
PostEvent(eventQ, ev);
}
/* ------------------------------------------------------------------------- */
class ipcEvent_ClientState : public PLEvent
{
public:
ipcEvent_ClientState(PRUint32 aClientID, PRUint32 aClientState)
: mClientID(aClientID)
, mClientState(aClientState)
{
PL_InitEvent(this, nsnull, HandleEvent, DestroyEvent);
}
PR_STATIC_CALLBACK(void *) HandleEvent(PLEvent *ev)
{
// maybe we've been shutdown!
if (!gClientState)
return nsnull;
ipcEvent_ClientState *self = (ipcEvent_ClientState *) ev;
for (PRInt32 i=0; i<gClientState->clientObservers.Count(); ++i)
gClientState->clientObservers[i]->OnClientStateChange(self->mClientID,
self->mClientState);
return nsnull;
}
PR_STATIC_CALLBACK(void) DestroyEvent(PLEvent *ev)
{
delete (ipcEvent_ClientState *) ev;
}
private:
PRUint32 mClientID;
PRUint32 mClientState;
};
/* ------------------------------------------------------------------------- */
class ipcEvent_ProcessPendingQ : public PLEvent
{
public:
ipcEvent_ProcessPendingQ(const nsID &aTarget)
: mTarget(aTarget)
{
PL_InitEvent(this, nsnull, HandleEvent, DestroyEvent);
}
PR_STATIC_CALLBACK(void *) HandleEvent(PLEvent *ev)
{
ProcessPendingQ(((ipcEvent_ProcessPendingQ *) ev)->mTarget);
return nsnull;
}
PR_STATIC_CALLBACK(void) DestroyEvent(PLEvent *ev)
{
delete (ipcEvent_ProcessPendingQ *) ev;
}
private:
const nsID mTarget;
};
static void
CallProcessPendingQ(const nsID &target, ipcTargetData *td)
{
// we assume that we are inside td's monitor
PLEvent *ev = new ipcEvent_ProcessPendingQ(target);
if (!ev)
return;
nsresult rv;
if (td->eventQ)
rv = td->eventQ->PostEvent(ev);
else
rv = IPC_DoCallback((ipcCallbackFunc) PL_HandleEvent, ev);
if (NS_FAILED(rv))
PL_DestroyEvent(ev);
}
/* ------------------------------------------------------------------------- */
static void
DisableMessageObserver(const nsID &aTarget)
{
nsRefPtr<ipcTargetData> td;
if (GetTarget(aTarget, getter_AddRefs(td)))
{
nsAutoMonitor mon(td->monitor);
++td->observerDisabled;
}
}
static void
EnableMessageObserver(const nsID &aTarget)
{
nsRefPtr<ipcTargetData> td;
if (GetTarget(aTarget, getter_AddRefs(td)))
{
nsAutoMonitor mon(td->monitor);
if (td->observerDisabled > 0 && --td->observerDisabled == 0)
if (!td->pendingQ.IsEmpty())
CallProcessPendingQ(aTarget, td);
}
}
/* ------------------------------------------------------------------------- */
// selects the next IPCM message with matching request index // selects the next IPCM message with matching request index
static PRBool static PRBool
WaitIPCMResponseSelector(void *arg, ipcTargetData *td, const ipcMessage *msg) WaitIPCMResponseSelector(void *arg, ipcTargetData *td, const ipcMessage *msg)
@ -438,9 +593,18 @@ MakeIPCMRequest(ipcMessage *msg, ipcMessage **responseMsg = nsnull)
PRUint32 requestIndex = IPCM_GetRequestIndex(msg); PRUint32 requestIndex = IPCM_GetRequestIndex(msg);
// suppress 'ProcessPendingQ' for IPCM messages until we receive the
// response to this IPCM request. if we did not do this then there
// would be a race condition leading to the possible removal of our
// response from the pendingQ between sending the request and waiting
// for the response.
DisableMessageObserver(IPCM_TARGET);
nsresult rv = IPC_SendMsg(msg); nsresult rv = IPC_SendMsg(msg);
if (NS_SUCCEEDED(rv)) if (NS_SUCCEEDED(rv))
rv = WaitIPCMResponse(requestIndex, responseMsg); rv = WaitIPCMResponse(requestIndex, responseMsg);
EnableMessageObserver(IPCM_TARGET);
return rv; return rv;
} }
@ -462,14 +626,16 @@ RemoveTarget(const nsID &aTarget, PRBool aNotifyDaemon)
static nsresult static nsresult
DefineTarget(const nsID &aTarget, DefineTarget(const nsID &aTarget,
ipcIMessageObserver *aObserver, ipcIMessageObserver *aObserver,
PRBool aOnCurrentThread,
PRBool aNotifyDaemon, PRBool aNotifyDaemon,
ipcTargetData **aResult) ipcTargetData **aResult)
{ {
nsresult rv; nsresult rv;
nsRefPtr<ipcTargetData> td( ipcTargetData::Create(aObserver) ); nsRefPtr<ipcTargetData> td( ipcTargetData::Create() );
if (!td) if (!td)
return NS_ERROR_OUT_OF_MEMORY; return NS_ERROR_OUT_OF_MEMORY;
td->SetObserver(aObserver, aOnCurrentThread);
if (!PutTarget(aTarget, td)) if (!PutTarget(aTarget, td))
return NS_ERROR_OUT_OF_MEMORY; return NS_ERROR_OUT_OF_MEMORY;
@ -506,7 +672,7 @@ TryConnect()
gClientState->connected = PR_TRUE; gClientState->connected = PR_TRUE;
rv = DefineTarget(IPCM_TARGET, nsnull, PR_FALSE, nsnull); rv = DefineTarget(IPCM_TARGET, nsnull, PR_FALSE, PR_FALSE, nsnull);
if (NS_FAILED(rv)) if (NS_FAILED(rv))
return rv; return rv;
@ -533,8 +699,7 @@ TryConnect()
nsresult nsresult
IPC_Init() IPC_Init()
{ {
if (gInitCount > 0) NS_ENSURE_TRUE(!gClientState, NS_ERROR_ALREADY_INITIALIZED);
return NS_OK;
IPC_InitLog(">>>"); IPC_InitLog(">>>");
@ -542,9 +707,6 @@ IPC_Init()
if (!gClientState) if (!gClientState)
return NS_ERROR_OUT_OF_MEMORY; return NS_ERROR_OUT_OF_MEMORY;
// IPC_Shutdown will decrement
gInitCount++;
nsresult rv = TryConnect(); nsresult rv = TryConnect();
if (NS_FAILED(rv)) if (NS_FAILED(rv))
IPC_Shutdown(); IPC_Shutdown();
@ -555,16 +717,13 @@ IPC_Init()
nsresult nsresult
IPC_Shutdown() IPC_Shutdown()
{ {
NS_ENSURE_TRUE(gInitCount, NS_ERROR_NOT_INITIALIZED); NS_ENSURE_TRUE(gClientState, NS_ERROR_NOT_INITIALIZED);
if (--gInitCount > 0)
return NS_OK;
if (gClientState->connected) if (gClientState->connected)
IPC_Disconnect(); IPC_Disconnect();
delete gClientState; delete gClientState;
gClientState = NULL; gClientState = nsnull;
return NS_OK; return NS_OK;
} }
@ -573,7 +732,8 @@ IPC_Shutdown()
nsresult nsresult
IPC_DefineTarget(const nsID &aTarget, IPC_DefineTarget(const nsID &aTarget,
ipcIMessageObserver *aObserver) ipcIMessageObserver *aObserver,
PRBool aOnCurrentThread)
{ {
NS_ENSURE_TRUE(gClientState, NS_ERROR_NOT_INITIALIZED); NS_ENSURE_TRUE(gClientState, NS_ERROR_NOT_INITIALIZED);
@ -590,11 +750,11 @@ IPC_DefineTarget(const nsID &aTarget,
// the observer is released on the main thread. // the observer is released on the main thread.
{ {
nsAutoMonitor mon(td->monitor); nsAutoMonitor mon(td->monitor);
td->observer = aObserver; td->SetObserver(aObserver, aOnCurrentThread);
} }
// remove target outside of td's monitor to avoid holding the monitor // remove target outside of td's monitor to avoid holding the monitor
// while entering the client state's lock. // while entering the client state's monitor.
if (!aObserver) if (!aObserver)
RemoveTarget(aTarget, PR_TRUE); RemoveTarget(aTarget, PR_TRUE);
@ -603,7 +763,7 @@ IPC_DefineTarget(const nsID &aTarget,
else else
{ {
if (aObserver) if (aObserver)
rv = DefineTarget(aTarget, aObserver, PR_TRUE, nsnull); rv = DefineTarget(aTarget, aObserver, aOnCurrentThread, PR_TRUE, nsnull);
else else
rv = NS_ERROR_INVALID_ARG; // unknown target rv = NS_ERROR_INVALID_ARG; // unknown target
} }
@ -611,6 +771,32 @@ IPC_DefineTarget(const nsID &aTarget,
return rv; return rv;
} }
nsresult
IPC_DisableMessageObserver(const nsID &aTarget)
{
NS_ENSURE_TRUE(gClientState, NS_ERROR_NOT_INITIALIZED);
// do not permit modifications to the IPCM protocol's target.
if (aTarget.Equals(IPCM_TARGET))
return NS_ERROR_INVALID_ARG;
DisableMessageObserver(aTarget);
return NS_OK;
}
nsresult
IPC_EnableMessageObserver(const nsID &aTarget)
{
NS_ENSURE_TRUE(gClientState, NS_ERROR_NOT_INITIALIZED);
// do not permit modifications to the IPCM protocol's target.
if (aTarget.Equals(IPCM_TARGET))
return NS_ERROR_INVALID_ARG;
EnableMessageObserver(aTarget);
return NS_OK;
}
nsresult nsresult
IPC_SendMessage(PRUint32 aReceiverID, IPC_SendMessage(PRUint32 aReceiverID,
const nsID &aTarget, const nsID &aTarget,
@ -797,10 +983,10 @@ IPC_ClientExists(PRUint32 aClientID, PRBool *aResult)
nsresult nsresult
IPC_SpawnDaemon(const char *path) IPC_SpawnDaemon(const char *path)
{ {
PRFileDesc *readable = NULL, *writable = NULL; PRFileDesc *readable = nsnull, *writable = nsnull;
PRProcessAttr *attr = NULL; PRProcessAttr *attr = nsnull;
nsresult rv = NS_ERROR_FAILURE; nsresult rv = NS_ERROR_FAILURE;
char *const argv[] = { (char *const) path, NULL }; char *const argv[] = { (char *const) path, nsnull };
char c; char c;
// setup an anonymous pipe that we can use to determine when the daemon // setup an anonymous pipe that we can use to determine when the daemon
@ -819,7 +1005,7 @@ IPC_SpawnDaemon(const char *path)
if (PR_ProcessAttrSetInheritableFD(attr, writable, IPC_STARTUP_PIPE_NAME) != PR_SUCCESS) if (PR_ProcessAttrSetInheritableFD(attr, writable, IPC_STARTUP_PIPE_NAME) != PR_SUCCESS)
goto end; goto end;
if (PR_CreateProcessDetached(path, argv, NULL, attr) != PR_SUCCESS) if (PR_CreateProcessDetached(path, argv, nsnull, attr) != PR_SUCCESS)
goto end; goto end;
if ((PR_Read(readable, &c, 1) != 1) && (c != IPC_STARTUP_PIPE_MAGIC)) if ((PR_Read(readable, &c, 1) != 1) && (c != IPC_STARTUP_PIPE_MAGIC))
@ -838,89 +1024,6 @@ end:
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
class ipcEvent_ClientState : public PLEvent
{
public:
ipcEvent_ClientState(PRUint32 aClientID, PRUint32 aClientState)
: mClientID(aClientID)
, mClientState(aClientState)
{
PL_InitEvent(this, nsnull, HandleEvent, DestroyEvent);
}
PR_STATIC_CALLBACK(void *) HandleEvent(PLEvent *ev)
{
// maybe we've been shutdown!
if (!gClientState)
return nsnull;
ipcEvent_ClientState *self = (ipcEvent_ClientState *) ev;
for (PRInt32 i=0; i<gClientState->clientObservers.Count(); ++i)
gClientState->clientObservers[i]->OnClientStateChange(self->mClientID,
self->mClientState);
return nsnull;
}
PR_STATIC_CALLBACK(void) DestroyEvent(PLEvent *ev)
{
delete (ipcEvent_ClientState *) ev;
}
private:
PRUint32 mClientID;
PRUint32 mClientState;
};
/* ------------------------------------------------------------------------- */
class ipcEvent_ProcessPendingQ : public PLEvent
{
public:
ipcEvent_ProcessPendingQ(const nsID &aTarget)
: mTarget(aTarget)
{
PL_InitEvent(this, nsnull, HandleEvent, DestroyEvent);
}
PR_STATIC_CALLBACK(void *) HandleEvent(PLEvent *ev)
{
ProcessPendingQ(((ipcEvent_ProcessPendingQ *) ev)->mTarget);
return nsnull;
}
PR_STATIC_CALLBACK(void) DestroyEvent(PLEvent *ev)
{
delete (ipcEvent_ProcessPendingQ *) ev;
}
private:
const nsID mTarget;
};
/* ------------------------------------------------------------------------- */
static void
PostEvent(PLEvent *ev)
{
if (!ev)
return;
nsCOMPtr<nsIEventQueue> eventQ;
NS_GetMainEventQ(getter_AddRefs(eventQ));
if (!eventQ)
return;
nsresult rv = eventQ->PostEvent(ev);
if (NS_FAILED(rv))
{
NS_WARNING("PostEvent failed");
PL_DestroyEvent(ev);
}
}
/* ------------------------------------------------------------------------- */
PR_STATIC_CALLBACK(PLDHashOperator) PR_STATIC_CALLBACK(PLDHashOperator)
EnumerateTargetMapAndNotify(const nsID &aKey, EnumerateTargetMapAndNotify(const nsID &aKey,
ipcTargetData *aData, ipcTargetData *aData,
@ -928,12 +1031,8 @@ EnumerateTargetMapAndNotify(const nsID &aKey,
{ {
nsAutoMonitor mon(aData->monitor); nsAutoMonitor mon(aData->monitor);
// this flag needs to be set while we are inside the monitor, since it is one
// of the conditions under which WaitTarget may block waiting for messages.
gClientState->connected = PR_FALSE;
// wake up anyone waiting on this target. // wake up anyone waiting on this target.
mon.Notify(); mon.NotifyAll();
return PL_DHASH_NEXT; return PL_DHASH_NEXT;
} }
@ -945,7 +1044,8 @@ IPC_OnConnectionEnd(nsresult error)
// now, go through the target map, and tickle each monitor. that should // now, go through the target map, and tickle each monitor. that should
// unblock any calls to WaitTarget. // unblock any calls to WaitTarget.
nsAutoLock lock(gClientState->lock); nsAutoMonitor mon(gClientState->monitor);
gClientState->connected = PR_FALSE;
gClientState->targetMap.EnumerateRead(EnumerateTargetMapAndNotify, nsnull); gClientState->targetMap.EnumerateRead(EnumerateTargetMapAndNotify, nsnull);
} }
@ -978,7 +1078,7 @@ IPC_OnMessageAvailable(ipcMessage *msg)
case IPCM_MSG_PSH_CLIENT_STATE: case IPCM_MSG_PSH_CLIENT_STATE:
{ {
ipcMessageCast<ipcmMessageClientState> status(msg); ipcMessageCast<ipcmMessageClientState> status(msg);
PostEvent(new ipcEvent_ClientState(status->ClientID(), PostEventToMainThread(new ipcEvent_ClientState(status->ClientID(),
status->ClientState())); status->ClientState()));
return; return;
} }
@ -990,17 +1090,23 @@ IPC_OnMessageAvailable(ipcMessage *msg)
{ {
nsAutoMonitor mon(td->monitor); nsAutoMonitor mon(td->monitor);
// we only want to dispatch a 'ProcessPendingQ' event if we have not
// already done so.
PRBool dispatchEvent = td->pendingQ.IsEmpty(); PRBool dispatchEvent = td->pendingQ.IsEmpty();
// put this message on our pending queue // put this message on our pending queue
td->pendingQ.Append(msg); td->pendingQ.Append(msg);
// make copy of target since |msg| may end up pointing to free'd memory
// once we notify the monitor.
const nsID target = msg->Target();
// wake up anyone waiting on this queue // wake up anyone waiting on this queue
mon.Notify(); mon.Notify();
// proxy call to target's message procedure // proxy call to target's message procedure
if (dispatchEvent) if (dispatchEvent)
PostEvent(new ipcEvent_ProcessPendingQ(msg->Target())); CallProcessPendingQ(target, td);
} }
else else
{ {

Просмотреть файл

@ -40,34 +40,25 @@
interface ipcILockNotify; interface ipcILockNotify;
/** /**
* This service provides named interprocess locking with either synchronous * This service provides named interprocess locking.
* or asynchronous waiting.
*/ */
[scriptable, uuid(9f6dbe15-d851-4b00-912a-5ac0be88a409)] [scriptable, uuid(9f6dbe15-d851-4b00-912a-5ac0be88a409)]
interface ipcILockService : nsISupports interface ipcILockService : nsISupports
{ {
/** /**
* Call this method to acquire a named lock. Pass a notification handler * Call this method to acquire a named interprocess lock.
* to be notified asynchronously when the lock is acquired. Otherwise,
* this function will block until the lock is acquired.
* *
* @param aLockName * @param aLockName
* specifies the name of the lock * specifies the name of the lock
* @param aNotify
* notification callback (NULL to synchronously acquire lock)
* @param aWaitIfBusy * @param aWaitIfBusy
* wait for the lock to become available; otherwise, fail if lock * wait for the lock to become available; otherwise, fail if lock
* is already held by some other process. * is already held by some other process.
*/ */
void acquireLock(in string aLockName, void acquireLock(in string aLockName,
in ipcILockNotify aNotify,
in boolean aWaitIfBusy); in boolean aWaitIfBusy);
/** /**
* Call this method to release a named lock. This method can be called * Call this method to release a named lock.
* before OnAcquireLockComplete has been called, which will effectively
* cancel the request to acquire the named lock. OnAcquireLockComplete
* will not be called after a call to ReleaseLock.
* *
* @param aLockName * @param aLockName
* specifies the name of the lock * specifies the name of the lock

Просмотреть файл

@ -66,7 +66,7 @@ IPC_FlattenLockMsg(const ipcLockMsg *msg, PRUint32 *bufLen)
+ strlen(msg->key) // key + strlen(msg->key) // key
+ 1; // null terminator + 1; // null terminator
PRUint8 *buf = (PRUint8 *) malloc(len); PRUint8 *buf = (PRUint8 *) ::operator new(len);
if (!buf) if (!buf)
return NULL; return NULL;

Просмотреть файл

@ -1,4 +1,4 @@
#include <stdlib.h> /* vim:set ts=4 sw=4 sts=4 et cindent: */
/* ***** BEGIN LICENSE BLOCK ***** /* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
* *
@ -36,7 +36,10 @@
* *
* ***** END LICENSE BLOCK ***** */ * ***** END LICENSE BLOCK ***** */
#include "nsIServiceManager.h" #include <stdlib.h>
#include "nsDependentString.h"
#include "nsHashKeys.h"
#include "nsAutoPtr.h"
#include "ipcILockNotify.h" #include "ipcILockNotify.h"
#include "ipcLockService.h" #include "ipcLockService.h"
#include "ipcLockProtocol.h" #include "ipcLockProtocol.h"
@ -49,22 +52,23 @@ static const nsID kLockTargetID = IPC_LOCK_TARGETID;
nsresult nsresult
ipcLockService::Init() ipcLockService::Init()
{ {
nsresult rv; if (!mResultMap.Init())
return NS_ERROR_OUT_OF_MEMORY;
rv = IPC_Init(); // Configure OnMessageAvailable to be called on the IPC thread. This is
if (NS_FAILED(rv)) // done to allow us to proxy OnAcquireLockComplete events to the right
return rv; // thread immediately even if the main thread is blocked waiting to acquire
// some other lock synchronously.
return IPC_DefineTarget(kLockTargetID, this); return IPC_DefineTarget(kLockTargetID, this, PR_FALSE);
} }
NS_IMPL_ISUPPORTS2(ipcLockService, ipcILockService, ipcIMessageObserver) NS_IMPL_THREADSAFE_ISUPPORTS2(ipcLockService, ipcILockService, ipcIMessageObserver)
NS_IMETHODIMP NS_IMETHODIMP
ipcLockService::AcquireLock(const char *lockName, ipcILockNotify *notify, PRBool waitIfBusy) ipcLockService::AcquireLock(const char *lockName, PRBool waitIfBusy)
{ {
LOG(("ipcLockService::AcquireLock [lock=%s sync=%u wait=%u]\n", LOG(("ipcLockService::AcquireLock [lock=%s wait=%u]\n", lockName, waitIfBusy));
lockName, notify == nsnull, waitIfBusy));
ipcLockMsg msg; ipcLockMsg msg;
msg.opcode = IPC_LOCK_OP_ACQUIRE; msg.opcode = IPC_LOCK_OP_ACQUIRE;
@ -72,30 +76,31 @@ ipcLockService::AcquireLock(const char *lockName, ipcILockNotify *notify, PRBool
msg.key = lockName; msg.key = lockName;
PRUint32 bufLen; PRUint32 bufLen;
PRUint8 *buf = IPC_FlattenLockMsg(&msg, &bufLen); nsAutoPtr<PRUint8> buf( IPC_FlattenLockMsg(&msg, &bufLen) );
if (!buf) if (!buf)
return NS_ERROR_OUT_OF_MEMORY; return NS_ERROR_OUT_OF_MEMORY;
nsresult lockStatus = NS_ERROR_UNEXPECTED;
nsDependentCString lockNameStr(lockName);
nsCStringHashKey hashKey(&lockNameStr);
if (!mResultMap.Put(hashKey.GetKey(), &lockStatus))
return NS_ERROR_OUT_OF_MEMORY;
// prevent our OnMessageAvailable from being called until we explicitly ask
// for it to be called via IPC_WaitMessage.
IPC_DISABLE_MESSAGE_OBSERVER_FOR_SCOPE(kLockTargetID);
nsresult rv = IPC_SendMessage(0, kLockTargetID, buf, bufLen); nsresult rv = IPC_SendMessage(0, kLockTargetID, buf, bufLen);
free(buf); if (NS_SUCCEEDED(rv)) {
if (NS_FAILED(rv)) {
LOG((" SendMessage failed [rv=%x]\n", rv));
return rv;
}
if (notify) {
nsCStringKey hashKey(lockName);
mPendingTable.Put(&hashKey, notify);
return NS_OK;
}
// block the calling thread until we get a response from the daemon // block the calling thread until we get a response from the daemon
rv = IPC_WaitMessage(0, kLockTargetID, this, PR_INTERVAL_NO_TIMEOUT);
if (NS_SUCCEEDED(rv))
rv = lockStatus;
}
mSyncLockName = lockName; mResultMap.Remove(hashKey.GetKey());
rv = IPC_WaitMessage(0, kLockTargetID, nsnull, PR_INTERVAL_NO_TIMEOUT); return rv;
mSyncLockName = nsnull;
return NS_FAILED(rv) ? rv : mSyncLockStatus;
} }
NS_IMETHODIMP NS_IMETHODIMP
@ -114,12 +119,11 @@ ipcLockService::ReleaseLock(const char *lockName)
return NS_ERROR_OUT_OF_MEMORY; return NS_ERROR_OUT_OF_MEMORY;
nsresult rv = IPC_SendMessage(0, kLockTargetID, buf, bufLen); nsresult rv = IPC_SendMessage(0, kLockTargetID, buf, bufLen);
free(buf); delete buf;
if (NS_FAILED(rv)) return rv; if (NS_FAILED(rv))
return rv;
nsCStringKey hashKey(lockName);
mPendingTable.Remove(&hashKey);
return NS_OK; return NS_OK;
} }
@ -130,37 +134,18 @@ ipcLockService::OnMessageAvailable(PRUint32 unused, const nsID &target,
ipcLockMsg msg; ipcLockMsg msg;
IPC_UnflattenLockMsg(data, dataLen, &msg); IPC_UnflattenLockMsg(data, dataLen, &msg);
LOG(("ipcLockService::OnMessageAvailable [lock=%s opcode=%u sync-lock=%s]\n", msg.key, msg.opcode, mSyncLockName)); LOG(("ipcLockService::OnMessageAvailable [lock=%s opcode=%u]\n", msg.key, msg.opcode));
nsDependentCString lockNameStr(msg.key);
nsCStringHashKey hashKey(&lockNameStr);
nsresult *status;
mResultMap.Get(hashKey.GetKey(), &status);
nsresult status;
if (msg.opcode == IPC_LOCK_OP_STATUS_ACQUIRED) if (msg.opcode == IPC_LOCK_OP_STATUS_ACQUIRED)
status = NS_OK; *status = NS_OK;
else else
status = NS_ERROR_FAILURE; *status = NS_ERROR_FAILURE;
// handle synchronous waiting case first
if (mSyncLockName) {
if (strcmp(mSyncLockName, msg.key) == 0) {
mSyncLockStatus = status;
return NS_OK;
}
return IPC_WAIT_NEXT_MESSAGE;
}
// otherwise, this is an asynchronous notification
NotifyComplete(msg.key, status);
return NS_OK; return NS_OK;
} }
void
ipcLockService::NotifyComplete(const char *lockName, nsresult status)
{
nsCStringKey hashKey(lockName);
nsISupports *obj = mPendingTable.Get(&hashKey); // ADDREFS
if (obj) {
nsCOMPtr<ipcILockNotify> notify = do_QueryInterface(obj);
NS_RELEASE(obj);
if (notify)
notify->OnAcquireLockComplete(lockName, status);
}
}

Просмотреть файл

@ -1,3 +1,4 @@
/* vim:set ts=4 sw=4 sts=4 et cindent: */
/* ***** BEGIN LICENSE BLOCK ***** /* ***** BEGIN LICENSE BLOCK *****
* Version: MPL 1.1/GPL 2.0/LGPL 2.1 * Version: MPL 1.1/GPL 2.0/LGPL 2.1
* *
@ -42,7 +43,14 @@
#include "ipcList.h" #include "ipcList.h"
#include "ipcdclient.h" #include "ipcdclient.h"
#include "nsCOMPtr.h" #include "nsCOMPtr.h"
#include "nsHashtable.h" #include "nsDataHashtable.h"
#include "nsHashKeys.h"
//-----------------------------------------------------------------------------
typedef nsDataHashtableMT<nsCStringHashKey, nsresult*> ipcLockResultMap;
//-----------------------------------------------------------------------------
class ipcLockService : public ipcILockService class ipcLockService : public ipcILockService
, public ipcIMessageObserver , public ipcIMessageObserver
@ -52,20 +60,14 @@ public:
NS_DECL_IPCILOCKSERVICE NS_DECL_IPCILOCKSERVICE
NS_DECL_IPCIMESSAGEOBSERVER NS_DECL_IPCIMESSAGEOBSERVER
ipcLockService() : mSyncLockName(nsnull) {}
~ipcLockService() { IPC_Shutdown(); }
NS_HIDDEN_(nsresult) Init(); NS_HIDDEN_(nsresult) Init();
private: private:
NS_HIDDEN_(void) NotifyComplete(const char *lockName, nsresult status); // maps lockname to the address of a nsresult, which will be assigned a
// value once a STATUS event is received.
// map from lockname to locknotify for pending notifications ipcLockResultMap mResultMap;
nsSupportsHashtable mPendingTable;
// if non-null, then this is the name of the lock we are trying to
// synchronously acquire.
const char *mSyncLockName;
nsresult mSyncLockStatus;
}; };
//-----------------------------------------------------------------------------
#endif // !ipcLockService_h__ #endif // !ipcLockService_h__

Просмотреть файл

@ -40,6 +40,7 @@
#include "ipcModuleUtil.h" #include "ipcModuleUtil.h"
#include "ipcLockProtocol.h" #include "ipcLockProtocol.h"
#include "plhash.h" #include "plhash.h"
#include "plstr.h"
static const nsID kLockTargetID = IPC_LOCK_TARGETID; static const nsID kLockTargetID = IPC_LOCK_TARGETID;
@ -75,6 +76,42 @@ struct ipcLockContext
, mNextPending(NULL) {} , mNextPending(NULL) {}
}; };
//-----------------------------------------------------------------------------
PR_STATIC_CALLBACK(void *)
ipcLockModule_AllocTable(void *pool, PRSize size)
{
return malloc(size);
}
PR_STATIC_CALLBACK(void)
ipcLockModule_FreeTable(void *pool, void *item)
{
free(item);
}
PR_STATIC_CALLBACK(PLHashEntry *)
ipcLockModule_AllocEntry(void *pool, const void *key)
{
return (PLHashEntry *) malloc(sizeof(PLHashEntry));
}
PR_STATIC_CALLBACK(void)
ipcLockModule_FreeEntry(void *pool, PLHashEntry *he, PRUintn flag)
{
PL_strfree((char *) he->key);
free(he);
}
static const PLHashAllocOps ipcLockModule_AllocOps = {
ipcLockModule_AllocTable,
ipcLockModule_FreeTable,
ipcLockModule_AllocEntry,
ipcLockModule_FreeEntry
};
//-----------------------------------------------------------------------------
static void static void
ipcLockModule_AcquireLock(PRUint32 cid, PRUint8 flags, const char *key) ipcLockModule_AcquireLock(PRUint32 cid, PRUint8 flags, const char *key)
{ {
@ -113,7 +150,7 @@ ipcLockModule_AcquireLock(PRUint32 cid, PRUint8 flags, const char *key)
if (!ctx) if (!ctx)
return; return;
PL_HashTableAdd(gLockTable, key, ctx); PL_HashTableAdd(gLockTable, PL_strdup(key), ctx);
ipcLockModule_Send(cid, key, IPC_LOCK_OP_STATUS_ACQUIRED); ipcLockModule_Send(cid, key, IPC_LOCK_OP_STATUS_ACQUIRED);
} }
@ -187,8 +224,16 @@ PR_STATIC_CALLBACK(PRIntn)
ipcLockModule_ReleaseByCID(PLHashEntry *he, PRIntn i, void *arg) ipcLockModule_ReleaseByCID(PLHashEntry *he, PRIntn i, void *arg)
{ {
PRUint32 cid = *(PRUint32 *) arg; PRUint32 cid = *(PRUint32 *) arg;
ipcLockModule_ReleaseLockHelper(cid, (const char *) he->key,
(ipcLockContext *) he->value); printf("$$$ ipcLockModule_ReleaseByCID [cid=%u key=%s he=%p]\n", cid, (char*)he->key, (void*)he);
ipcLockContext *ctx = (ipcLockContext *) he->value;
if (ctx->mOwnerID != cid)
return HT_ENUMERATE_NEXT;
if (ipcLockModule_ReleaseLockHelper(cid, (const char *) he->key, ctx))
return HT_ENUMERATE_REMOVE;
return HT_ENUMERATE_NEXT; return HT_ENUMERATE_NEXT;
} }
@ -203,7 +248,7 @@ ipcLockModule_Init()
PL_HashString, PL_HashString,
PL_CompareStrings, PL_CompareStrings,
PL_CompareValues, PL_CompareValues,
NULL, &ipcLockModule_AllocOps,
NULL); NULL);
} }

Просмотреть файл

@ -97,8 +97,6 @@ tmTransactionService::~tmTransactionService() {
if (qmap) if (qmap)
delete qmap; delete qmap;
} }
IPC_Shutdown();
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -116,11 +114,7 @@ tmTransactionService::Init(const nsACString & aNamespace) {
nsresult rv; nsresult rv;
rv = IPC_Init(); rv = IPC_DefineTarget(kTransModuleID, this, PR_TRUE);
if (NS_FAILED(rv))
return rv;
rv = IPC_DefineTarget(kTransModuleID, this);
if (NS_FAILED(rv)) if (NS_FAILED(rv))
return rv; return rv;
@ -189,7 +183,7 @@ tmTransactionService::Attach(const nsACString & aDomainName,
// acquire a lock if neccessary // acquire a lock if neccessary
if (aLockingCall) if (aLockingCall)
lockService->AcquireLock(joinedQueueName, nsnull, PR_TRUE); lockService->AcquireLock(joinedQueueName, PR_TRUE);
// XXX need to handle lock failures // XXX need to handle lock failures
if (NS_SUCCEEDED(trans.Init(0, // no IPC client if (NS_SUCCEEDED(trans.Init(0, // no IPC client
@ -224,7 +218,7 @@ tmTransactionService::Flush(const nsACString & aDomainName,
PRBool aLockingCall) { PRBool aLockingCall) {
// acquire a lock if neccessary // acquire a lock if neccessary
if (aLockingCall) if (aLockingCall)
lockService->AcquireLock(GetJoinedQueueName(aDomainName), nsnull, PR_TRUE); lockService->AcquireLock(GetJoinedQueueName(aDomainName), PR_TRUE);
// synchronous flush // synchronous flush
nsresult rv = SendDetachOrFlush(GetQueueID(aDomainName), TM_FLUSH, PR_TRUE); nsresult rv = SendDetachOrFlush(GetQueueID(aDomainName), TM_FLUSH, PR_TRUE);

Просмотреть файл

@ -50,7 +50,9 @@
// T *mNext; // T *mNext;
// }; // };
// //
// objects added to the list must be allocated with operator new. // objects added to the list must be allocated with operator new. class T may
// optionally inherit from ipcListNode<T> if it doesn't wish to define mNext
// explicitly.
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
template<class T> template<class T>
@ -184,4 +186,13 @@ protected:
T *mTail; T *mTail;
}; };
template<class T>
class ipcListNode
{
public:
ipcListNode() : mNext(nsnull) {}
T *mNext;
};
#endif // !ipcList_h__ #endif // !ipcList_h__

Просмотреть файл

@ -169,6 +169,7 @@ myIpcClientQueryHandler::OnQueryComplete(PRUint32 aQueryID,
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
#if 0
class myIpcLockNotify : public ipcILockNotify class myIpcLockNotify : public ipcILockNotify
{ {
public: public:
@ -185,6 +186,7 @@ myIpcLockNotify::OnAcquireLockComplete(const char *lockName, nsresult status)
gIpcLockServ->ReleaseLock(lockName); gIpcLockServ->ReleaseLock(lockName);
return NS_OK; return NS_OK;
} }
#endif
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -306,10 +308,10 @@ int main(int argc, char **argv)
RETURN_IF_FAILED(rv, "do_GetService(ipcLockServ)"); RETURN_IF_FAILED(rv, "do_GetService(ipcLockServ)");
NS_ADDREF(gIpcLockServ = lockService); NS_ADDREF(gIpcLockServ = lockService);
nsCOMPtr<ipcILockNotify> notify(new myIpcLockNotify()); //nsCOMPtr<ipcILockNotify> notify(new myIpcLockNotify());
gIpcLockServ->AcquireLock("blah", notify, PR_TRUE); gIpcLockServ->AcquireLock("blah", PR_TRUE);
rv = gIpcLockServ->AcquireLock("foo", nsnull, PR_TRUE); rv = gIpcLockServ->AcquireLock("foo", PR_TRUE);
printf("*** sync AcquireLock returned [rv=%x]\n", rv); printf("*** sync AcquireLock returned [rv=%x]\n", rv);
PLEvent *ev; PLEvent *ev;