Fix for bug 121951 -- make PR_Poll on Mac work with layered file descriptors. r=wtc, r=gordon

This commit is contained in:
sfraser%netscape.com 2002-02-19 01:43:15 +00:00
Родитель 855f941a23
Коммит 1a13143b79
1 изменённых файлов: 199 добавлений и 118 удалений

Просмотреть файл

@ -1086,12 +1086,12 @@ typedef struct RawEndpointAndThread
// A5 is OK. Cannot allocate memory here
static pascal void RawEndpointNotifierRoutine(void * contextPtr, OTEventCode code, OTResult result, void * cookie)
{
RawEndpointAndThread *endthr = (RawEndpointAndThread *) contextPtr;
RawEndpointAndThread *endthr = (RawEndpointAndThread *) contextPtr;
PRThread * thread = endthr->thread;
EndpointRef * endpoint = endthr->endpoint;
_PRCPU * cpu = _PR_MD_CURRENT_CPU();
OSStatus err;
OTResult resultOT;
OSStatus err;
OTResult resultOT;
switch (code)
{
@ -1317,7 +1317,7 @@ PRInt32 _MD_connect(PRFileDesc *fd, PRNetAddr *addr, PRUint32 addrlen, PRInterva
err = kEFAULTErr;
goto ErrorExit;
}
// Bind to a local port; let the system assign it.
bindAddr.inet.family = AF_INET;
@ -1328,19 +1328,19 @@ PRInt32 _MD_connect(PRFileDesc *fd, PRNetAddr *addr, PRUint32 addrlen, PRInterva
bindReq.addr.buf = (UInt8*) &bindAddr;
bindReq.qlen = 0;
PR_Lock(fd->secret->md.miscLock);
PR_Lock(fd->secret->md.miscLock);
PrepareForAsyncCompletion(me, fd->secret->md.osfd);
fd->secret->md.misc.thread = me;
fd->secret->md.misc.thread = me;
err = OTBind(endpoint, &bindReq, NULL);
if (err != kOTNoError) {
me->io_pending = PR_FALSE;
PR_Unlock(fd->secret->md.miscLock);
goto ErrorExit;
}
me->io_pending = PR_FALSE;
PR_Unlock(fd->secret->md.miscLock);
goto ErrorExit;
}
WaitOnThisThread(me, PR_INTERVAL_NO_TIMEOUT);
PR_Unlock(fd->secret->md.miscLock);
PR_Unlock(fd->secret->md.miscLock);
err = me->md.osErrCode;
if (err != kOTNoError)
@ -1352,26 +1352,26 @@ PRInt32 _MD_connect(PRFileDesc *fd, PRNetAddr *addr, PRUint32 addrlen, PRInterva
sndCall.addr.len = addrlen;
sndCall.addr.buf = (UInt8*) addr;
if (!fd->secret->nonblocking) {
PrepareForAsyncCompletion(me, fd->secret->md.osfd);
PR_ASSERT(fd->secret->md.write.thread == NULL);
fd->secret->md.write.thread = me;
if (!fd->secret->nonblocking) {
PrepareForAsyncCompletion(me, fd->secret->md.osfd);
PR_ASSERT(fd->secret->md.write.thread == NULL);
fd->secret->md.write.thread = me;
}
err = OTConnect (endpoint, &sndCall, NULL);
if (err == kOTNoError) {
PR_ASSERT(!"OTConnect returned kOTNoError in async mode!?!");
}
if (fd->secret->nonblocking) {
if (err == kOTNoDataErr)
err = EINPROGRESS;
goto ErrorExit;
} else {
if (err != kOTNoError && err != kOTNoDataErr) {
me->io_pending = PR_FALSE;
goto ErrorExit;
}
}
if (err == kOTNoError) {
PR_ASSERT(!"OTConnect returned kOTNoError in async mode!?!");
}
if (fd->secret->nonblocking) {
if (err == kOTNoDataErr)
err = EINPROGRESS;
goto ErrorExit;
} else {
if (err != kOTNoError && err != kOTNoDataErr) {
me->io_pending = PR_FALSE;
goto ErrorExit;
}
}
WaitOnThisThread(me, timeout);
@ -1761,83 +1761,126 @@ static PRBool GetState(PRFileDesc *fd, PRBool *readReady, PRBool *writeReady, PR
}
// check to see if any of the poll descriptors have data available
// for reading or writing.
static PRInt32 CheckPollDescs(PRPollDesc *pds, PRIntn npds)
// for reading or writing, by calling their poll methods (layered IO).
static PRInt32 CheckPollDescMethods(PRPollDesc *pds, PRIntn npds, PRInt16 *outReadFlags, PRInt16 *outWriteFlags)
{
PRInt32 ready = 0;
PRPollDesc *pd, *epd;
PRInt32 ready = 0;
PRPollDesc *pd, *epd;
PRInt16 *readFlag, *writeFlag;
for (pd = pds, epd = pd + npds; pd < epd; pd++)
for (pd = pds, epd = pd + npds, readFlag = outReadFlags, writeFlag = outWriteFlags;
pd < epd;
pd++, readFlag++, writeFlag++)
{
PRInt16 in_flags_read = 0, in_flags_write = 0;
PRInt16 out_flags_read = 0, out_flags_write = 0;
if (NULL == pd->fd || pd->in_flags == 0) continue;
if (pd->in_flags & PR_POLL_READ)
{
PRInt16 in_flags_read = 0, in_flags_write = 0;
PRInt16 out_flags_read = 0, out_flags_write = 0;
if (NULL == pd->fd || pd->in_flags == 0) continue;
if (pd->in_flags & PR_POLL_READ)
{
in_flags_read = (pd->fd->methods->poll)(
pd->fd, pd->in_flags & ~PR_POLL_WRITE, &out_flags_read);
}
if (pd->in_flags & PR_POLL_WRITE)
{
in_flags_write = (pd->fd->methods->poll)(
pd->fd, pd->in_flags & ~PR_POLL_READ, &out_flags_write);
}
if ((0 != (in_flags_read & out_flags_read))
|| (0 != (in_flags_write & out_flags_write)))
{
ready += 1; /* some layer has buffer input */
pd->out_flags = out_flags_read | out_flags_write;
}
else
{
PRFileDesc *bottomFD;
PRBool readReady, writeReady, exceptReady;
pd->out_flags = 0; /* pre-condition */
bottomFD = PR_GetIdentitiesLayer(pd->fd, PR_NSPR_IO_LAYER);
/* bottomFD can be NULL for pollable sockets */
if (bottomFD)
{
if (_PR_FILEDESC_OPEN == bottomFD->secret->state)
{
if (GetState(bottomFD, &readReady, &writeReady, &exceptReady))
{
if (readReady)
{
if (in_flags_read & PR_POLL_READ)
pd->out_flags |= PR_POLL_READ;
if (in_flags_write & PR_POLL_READ)
pd->out_flags |= PR_POLL_WRITE;
}
if (writeReady)
{
if (in_flags_read & PR_POLL_WRITE)
pd->out_flags |= PR_POLL_READ;
if (in_flags_write & PR_POLL_WRITE)
pd->out_flags |= PR_POLL_WRITE;
}
if (exceptReady && (pd->in_flags & PR_POLL_EXCEPT))
{
pd->out_flags |= PR_POLL_EXCEPT;
}
if (0 != pd->out_flags) ready++;
}
}
else /* bad state */
{
ready += 1; /* this will cause an abrupt return */
pd->out_flags = PR_POLL_NVAL; /* bogii */
}
}
}
in_flags_read = (pd->fd->methods->poll)(
pd->fd, pd->in_flags & ~PR_POLL_WRITE, &out_flags_read);
}
if (pd->in_flags & PR_POLL_WRITE)
{
in_flags_write = (pd->fd->methods->poll)(
pd->fd, pd->in_flags & ~PR_POLL_READ, &out_flags_write);
}
if ((0 != (in_flags_read & out_flags_read)) ||
(0 != (in_flags_write & out_flags_write)))
{
ready += 1; /* some layer has buffer input */
pd->out_flags = out_flags_read | out_flags_write;
}
*readFlag = in_flags_read;
*writeFlag = in_flags_write;
}
return ready;
}
// set or clear md.poll.thread on the poll descriptors
// check to see if any of OT endpoints of the poll descriptors have data available
// for reading or writing.
static PRInt32 CheckPollDescEndpoints(PRPollDesc *pds, PRIntn npds, const PRInt16 *inReadFlags, const PRInt16 *inWriteFlags)
{
PRInt32 ready = 0;
PRPollDesc *pd, *epd;
const PRInt16 *readFlag, *writeFlag;
for (pd = pds, epd = pd + npds, readFlag = inReadFlags, writeFlag = inWriteFlags;
pd < epd;
pd++, readFlag++, writeFlag++)
{
PRFileDesc *bottomFD;
PRBool readReady, writeReady, exceptReady;
PRInt16 in_flags_read = *readFlag;
PRInt16 in_flags_write = *writeFlag;
if (NULL == pd->fd || pd->in_flags == 0) continue;
bottomFD = PR_GetIdentitiesLayer(pd->fd, PR_NSPR_IO_LAYER);
/* bottomFD can be NULL for pollable sockets */
if (bottomFD)
{
if (_PR_FILEDESC_OPEN == bottomFD->secret->state)
{
pd->out_flags = 0; /* pre-condition */
if (GetState(bottomFD, &readReady, &writeReady, &exceptReady))
{
if (readReady)
{
if (in_flags_read & PR_POLL_READ)
pd->out_flags |= PR_POLL_READ;
if (in_flags_write & PR_POLL_READ)
pd->out_flags |= PR_POLL_WRITE;
}
if (writeReady)
{
if (in_flags_read & PR_POLL_WRITE)
pd->out_flags |= PR_POLL_READ;
if (in_flags_write & PR_POLL_WRITE)
pd->out_flags |= PR_POLL_WRITE;
}
if (exceptReady && (pd->in_flags & PR_POLL_EXCEPT))
{
pd->out_flags |= PR_POLL_EXCEPT;
}
if (0 != pd->out_flags) ready++;
}
}
else /* bad state */
{
ready += 1; /* this will cause an abrupt return */
pd->out_flags = PR_POLL_NVAL; /* bogii */
}
}
}
return ready;
}
// see how many of the poll descriptors are ready
static PRInt32 CountReadyPollDescs(PRPollDesc *pds, PRIntn npds)
{
PRInt32 ready = 0;
PRPollDesc *pd, *epd;
for (pd = pds, epd = pd + npds; pd < epd; pd++)
{
if (pd->out_flags)
ready ++;
}
return ready;
}
// set or clear the poll thread on the poll descriptors
static void SetDescPollThread(PRPollDesc *pds, PRIntn npds, PRThread* thread)
{
PRInt32 ready = 0;
@ -1869,43 +1912,81 @@ static void SetDescPollThread(PRPollDesc *pds, PRIntn npds, PRThread* thread)
bottomFD->secret->md.write.thread = thread;
}
}
}
}
}
}
#define DESCRIPTOR_FLAGS_ARRAY_SIZE 32
PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout)
{
PRThread *thread = _PR_MD_CURRENT_THREAD();
intn is;
PRInt32 ready;
PRInt16 readFlagsArray[DESCRIPTOR_FLAGS_ARRAY_SIZE];
PRInt16 writeFlagsArray[DESCRIPTOR_FLAGS_ARRAY_SIZE];
if (timeout == PR_INTERVAL_NO_WAIT) {
return CheckPollDescs(pds, npds);
PRInt16 *readFlags = readFlagsArray;
PRInt16 *writeFlags = writeFlagsArray;
PRInt16 *ioFlags = NULL;
PRThread *thread = _PR_MD_CURRENT_THREAD();
PRInt32 ready;
intn is;
PRBool needsToWait = PR_FALSE;
if (npds > DESCRIPTOR_FLAGS_ARRAY_SIZE)
{
// we allocate a single double-size array. The first half is used
// for read flags, and the second half for write flags.
ioFlags = (PRInt16*)PR_Malloc(sizeof(PRInt16) * npds * 2);
if (!ioFlags)
{
PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
return -1;
}
readFlags = ioFlags;
writeFlags = &ioFlags[npds];
}
(void)CheckPollDescMethods(pds, npds, readFlags, writeFlags);
_PR_INTSOFF(is);
PR_Lock(thread->md.asyncIOLock);
// ensure that we don't miss the firing of the notifier while checking socket status
// need to set up the thread
PrepareForAsyncCompletion(thread, 0);
SetDescPollThread(pds, npds, thread);
ready = CheckPollDescs(pds, npds);
(void)CheckPollDescEndpoints(pds, npds, readFlags, writeFlags);
ready = CountReadyPollDescs(pds, npds);
if ((ready == 0) && (timeout != PR_INTERVAL_NO_WAIT))
{
PrepareForAsyncCompletion(thread, 0);
SetDescPollThread(pds, npds, thread);
needsToWait = PR_TRUE;
}
PR_Unlock(thread->md.asyncIOLock);
_PR_FAST_INTSON(is);
_PR_INTSON(is);
if (ready == 0) {
if (needsToWait)
{
WaitOnThisThread(thread, timeout);
ready = CheckPollDescs(pds, npds);
} else {
// since we may have been woken by a pollable event
// firing, we have to check both poll methods and
// endpoints.
(void)CheckPollDescMethods(pds, npds, readFlags, writeFlags);
(void)CheckPollDescEndpoints(pds, npds, readFlags, writeFlags);
ready = CountReadyPollDescs(pds, npds);
SetDescPollThread(pds, npds, NULL);
}
else {
thread->io_pending = PR_FALSE;
}
SetDescPollThread(pds, npds, NULL);
if (readFlags != readFlagsArray)
PR_Free(ioFlags);
return ready;
}