Fix for 100353 -- handle polling for read and write on different threads, and properly deal with sending to a socket that has flow control restrictions. Fixes a problem saving to IMAP server sent messages with attachments. r=gordon, wtc.

This commit is contained in:
sfraser%netscape.com 2001-10-09 02:12:40 +00:00
Родитель 0bc3b0d4a5
Коммит efece6ef56
2 изменённых файлов: 173 добавлений и 131 удалений

Просмотреть файл

@ -120,11 +120,10 @@ struct _MDFileDesc {
/* Server sockets: listen bit tells the notifier func what to do */
PRBool doListen;
_MDSocketCallerInfo misc;
_MDSocketCallerInfo read;
_MDSocketCallerInfo write;
_MDSocketCallerInfo poll;
};
/*

Просмотреть файл

@ -305,92 +305,89 @@ WakeUpNotifiedThread(PRThread *thread, OTResult result)
// Notification routine
// Async callback routine.
// A5 is OK. Cannot allocate memory here
// Ref: http://gemma.apple.com/techpubs/mac/NetworkingOT/NetworkingWOT-100.html
//
static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResult result, void * cookie)
{
PRFilePrivate *secret = (PRFilePrivate *) contextPtr;
_MDFileDesc * md = &(secret->md);
EndpointRef endpoint = (EndpointRef)secret->md.osfd;
PRThread * thread = NULL;
PRThread * pollThread = md->poll.thread;
OSStatus err;
OTResult resultOT;
TDiscon discon;
PRFilePrivate *secret = (PRFilePrivate *) contextPtr;
_MDFileDesc * md = &(secret->md);
EndpointRef endpoint = (EndpointRef)secret->md.osfd;
PRThread * readThread = NULL; // also used for 'misc'
PRThread * writeThread = NULL;
OSStatus err;
OTResult resultOT;
TDiscon discon;
switch (code)
{
// OTLook Events -
case T_LISTEN: // A connection request is available
// If md->doListen is true, then PR_Listen has been
// called on this endpoint; therefore, we're ready to
// accept connections. But we'll do that with PR_Accept
// (which calls OTListen, OTAccept, etc) instead of
// doing it here.
if (md->doListen) {
thread = secret->md.misc.thread;
secret->md.misc.thread = NULL;
secret->md.misc.cookie = cookie;
break;
} else {
// Reject the connection, we're not listening
OTSndDisconnect(endpoint, NULL);
}
// If md->doListen is true, then PR_Listen has been
// called on this endpoint; therefore, we're ready to
// accept connections. But we'll do that with PR_Accept
// (which calls OTListen, OTAccept, etc) instead of
// doing it here.
if (md->doListen) {
readThread = secret->md.misc.thread;
secret->md.misc.thread = NULL;
secret->md.misc.cookie = cookie;
break;
} else {
// Reject the connection, we're not listening
OTSndDisconnect(endpoint, NULL);
}
break;
case T_CONNECT: // Confirmation of a connect request
// cookie = sndCall parameter from OTConnect()
// cookie = sndCall parameter from OTConnect()
err = OTRcvConnect(endpoint, NULL);
PR_ASSERT(err == kOTNoError);
// wake up waiting thread, if any
thread = secret->md.write.thread;
// wake up waiting thread, if any.
writeThread = secret->md.write.thread;
secret->md.write.thread = NULL;
secret->md.write.cookie = cookie;
secret->md.write.cookie = cookie;
break;
case T_DATA: // Standard data is available
// Mark this socket as readable.
secret->md.readReady = PR_TRUE;
// Mark this socket as readable.
secret->md.readReady = PR_TRUE;
// wake up waiting thread, if any
thread = secret->md.read.thread;
// wake up waiting thread, if any
readThread = secret->md.read.thread;
secret->md.read.thread = NULL;
secret->md.read.cookie = cookie;
break;
break;
case T_EXDATA: // Expedited data is available
PR_ASSERT(!"T_EXDATA Not implemented");
return;
return;
case T_DISCONNECT: // A disconnect is available
discon.udata.len = 0;
err = OTRcvDisconnect(endpoint, &discon);
PR_ASSERT(err == kOTNoError);
secret->md.exceptReady = PR_TRUE;
secret->md.exceptReady = PR_TRUE; // XXX Check this
// wake up waiting threads, if any
result = -3199 - discon.reason; // obtain the negative error code
// wake up waiting threads, if any
result = -3199 - discon.reason; // obtain the negative error code
if ((readThread = secret->md.read.thread) != NULL) {
secret->md.read.thread = NULL;
secret->md.read.cookie = cookie;
}
if ((thread = secret->md.read.thread) != NULL) {
secret->md.read.thread = NULL;
secret->md.read.cookie = cookie;
WakeUpNotifiedThread(thread, result);
}
if ((thread = secret->md.write.thread) != NULL) {
secret->md.write.thread = NULL;
secret->md.write.cookie = cookie;
WakeUpNotifiedThread(thread, result);
}
thread = NULL; // already took care of notification here
if ((writeThread = secret->md.write.thread) != NULL) {
secret->md.write.thread = NULL;
secret->md.write.cookie = cookie;
}
break;
case T_ERROR: // obsolete/unused in library
PR_ASSERT(!"T_ERROR Not implemented");
return;
return;
case T_UDERR: // UDP Send error; clear the error
(void) OTRcvUDErr((EndpointRef) cookie, NULL);
(void) OTRcvUDErr((EndpointRef) cookie, NULL);
break;
case T_ORDREL: // An orderly release is available
@ -398,28 +395,27 @@ static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResul
PR_ASSERT(err == kOTNoError);
secret->md.readReady = PR_TRUE; // mark readable (to emulate bsd sockets)
// remember connection is closed, so we can return 0 on read or receive
secret->md.orderlyDisconnect = PR_TRUE;
thread = secret->md.read.thread;
secret->md.read.thread = NULL;
secret->md.read.cookie = cookie;
secret->md.orderlyDisconnect = PR_TRUE;
readThread = secret->md.read.thread;
secret->md.read.thread = NULL;
secret->md.read.cookie = cookie;
break;
case T_GODATA: // Flow control lifted on standard data
secret->md.writeReady = PR_TRUE;
resultOT = OTLook(endpoint); // clear T_GODATA event
PR_ASSERT(resultOT == T_GODATA);
// wake up waiting thread, if any
thread = secret->md.write.thread;
resultOT = OTLook(endpoint); // clear T_GODATA event
PR_ASSERT(resultOT == T_GODATA);
// wake up waiting thread, if any
writeThread = secret->md.write.thread;
secret->md.write.thread = NULL;
secret->md.write.cookie = cookie;
break;
case T_GOEXDATA: // Flow control lifted on expedited data
PR_ASSERT(!"T_GOEXDATA Not implemented");
return;
return;
case T_REQUEST: // An Incoming request is available
PR_ASSERT(!"T_REQUEST Not implemented");
@ -430,13 +426,13 @@ static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResul
return;
case T_PASSCON: // State is now T_DATAXFER
// OTAccept() complete, receiving endpoint in T_DATAXFER state
// cookie = OTAccept() resRef parameter
break;
// OTAccept() complete, receiving endpoint in T_DATAXFER state
// cookie = OTAccept() resRef parameter
break;
case T_RESET: // Protocol has been reset
PR_ASSERT(!"T_RESET Not implemented");
return;
return;
// Async Completion Events
case T_BINDCOMPLETE:
@ -444,39 +440,39 @@ static pascal void NotifierRoutine(void * contextPtr, OTEventCode code, OTResul
case T_ACCEPTCOMPLETE:
case T_OPTMGMTCOMPLETE:
case T_GETPROTADDRCOMPLETE:
thread = secret->md.misc.thread;
readThread = secret->md.misc.thread;
secret->md.misc.thread = NULL;
secret->md.misc.cookie = cookie;
break;
// case T_OPENCOMPLETE: // we open endpoints in synchronous mode
// case T_OPENCOMPLETE: // we open endpoints in synchronous mode
// case T_REPLYCOMPLETE:
// case T_DISCONNECTCOMPLETE: // we don't call OTSndDisconnect()
// case T_DISCONNECTCOMPLETE: // we don't call OTSndDisconnect()
// case T_RESOLVEADDRCOMPLETE:
// case T_GETINFOCOMPLETE:
// case T_SYNCCOMPLETE:
// case T_MEMORYRELEASED: // only if OTAckSends() called on endpoint
// case T_MEMORYRELEASED: // only if OTAckSends() called on endpoint
// case T_REGNAMECOMPLETE:
// case T_DELNAMECOMPLETE:
// case T_LKUPNAMECOMPLETE:
// case T_LKUPNAMERESULT:
// OpenTptInternet.h
// case T_DNRSTRINGTOADDRCOMPLETE: // DNS is handled by dnsContext in DNSNotifierRoutine()
// OpenTptInternet.h
// case T_DNRSTRINGTOADDRCOMPLETE: // DNS is handled by dnsContext in DNSNotifierRoutine()
// case T_DNRADDRTONAMECOMPLETE:
// case T_DNRSYSINFOCOMPLETE:
// case T_DNRMAILEXCHANGECOMPLETE:
// case T_DNRQUERYCOMPLETE:
default:
// we should probably have a bit more sophisticated handling of kOTSystemSleep, etc.
// PR_ASSERT(code != 0);
// we should probably have a bit more sophisticated handling of kOTSystemSleep, etc.
// PR_ASSERT(code != 0);
return;
}
if (pollThread)
WakeUpNotifiedThread(pollThread, kOTNoError);
if (readThread)
WakeUpNotifiedThread(readThread, result);
if (thread && (thread != pollThread))
WakeUpNotifiedThread(thread, result);
if (writeThread && (writeThread != readThread))
WakeUpNotifiedThread(writeThread, result);
}
@ -488,8 +484,8 @@ static OSErr CreateSocket(int type, EndpointRef *endpoint)
OTConfiguration *config;
EndpointRef ep;
// for now we just create the endpoint
// we'll make it asynchronous and give it a notifier routine in _MD_makenonblock()
// for now we just create the endpoint
// we'll make it asynchronous and give it a notifier routine in _MD_makenonblock()
switch (type){
case SOCK_STREAM: configName = kTCPName; break;
@ -519,7 +515,7 @@ PRInt32 _MD_socket(int domain, int type, int protocol)
OSStatus err;
EndpointRef endpoint;
_MD_FinishInitNetAccess();
_MD_FinishInitNetAccess();
// We only deal with internet domain
if (domain != AF_INET) {
@ -1349,7 +1345,8 @@ PRInt32 _MD_connect(PRFileDesc *fd, PRNetAddr *addr, PRUint32 addrlen, PRInterva
sndCall.addr.buf = (UInt8*) addr;
if (!fd->secret->nonblocking) {
PrepareForAsyncCompletion(me, fd->secret->md.osfd);
PrepareForAsyncCompletion(me, fd->secret->md.osfd);
PR_ASSERT(fd->secret->md.write.thread == NULL);
fd->secret->md.write.thread = me;
}
@ -1407,7 +1404,10 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount,
err = kEFAULTErr;
goto ErrorExit;
}
PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == NULL :
fd->secret->md.read.thread == NULL);
while (bytesLeft > 0)
{
Boolean disabledNotifications = OTEnterNotifier(endpoint);
@ -1416,7 +1416,6 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount,
if (opCode == kSTREAM_SEND) {
do {
fd->secret->md.write.thread = me;
fd->secret->md.writeReady = PR_FALSE; // expect the worst
result = OTSnd(endpoint, buf, bytesLeft, NULL);
@ -1500,8 +1499,10 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount,
if (result > 0) {
buf = (void *) ( (UInt32) buf + (UInt32)result );
bytesLeft -= result;
if (opCode == kSTREAM_RECEIVE)
return result;
if (opCode == kSTREAM_RECEIVE) {
amount = result;
goto NormalExit;
}
} else {
switch (result) {
case kOTLookErr:
@ -1513,8 +1514,15 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount,
case kEAGAINErr:
case kEWOULDBLOCKErr:
if (fd->secret->nonblocking) {
err = result;
goto ErrorExit;
if (bytesLeft == amount) { // no data was sent
err = result;
goto ErrorExit;
}
// some data was sent
amount -= bytesLeft;
goto NormalExit;
}
WaitOnThisThread(me, timeout);
@ -1524,8 +1532,11 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount,
break;
case kOTOutStateErr: // if provider already closed, fall through to handle error
if (fd->secret->md.orderlyDisconnect)
return 0;
if (fd->secret->md.orderlyDisconnect) {
amount = 0;
goto NormalExit;
}
// else fall through
default:
err = result;
goto ErrorExit;
@ -1533,30 +1544,31 @@ static PRInt32 SendReceiveStream(PRFileDesc *fd, void *buf, PRInt32 amount,
}
}
PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == nil :
fd->secret->md.read.thread == nil);
NormalExit:
PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == NULL :
fd->secret->md.read.thread == NULL);
return amount;
ErrorExit:
PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == nil :
fd->secret->md.read.thread == nil);
PR_ASSERT(opCode == kSTREAM_SEND ? fd->secret->md.write.thread == NULL :
fd->secret->md.read.thread == NULL);
macsock_map_error(err);
return -1;
}
}
PRInt32 _MD_recv(PRFileDesc *fd, void *buf, PRInt32 amount,
PRIntn flags, PRIntervalTime timeout)
{
return (SendReceiveStream(fd, buf, amount, flags, timeout, kSTREAM_RECEIVE));
}
}
PRInt32 _MD_send(PRFileDesc *fd,const void *buf, PRInt32 amount,
PRIntn flags, PRIntervalTime timeout)
{
return (SendReceiveStream(fd, (void *)buf, amount, flags, timeout, kSTREAM_SEND));
}
}
// Errors:
@ -1638,7 +1650,7 @@ static PRInt32 SendReceiveDgram(PRFileDesc *fd, void *buf, PRInt32 amount,
ErrorExit:
macsock_map_error(err);
return -1;
}
}
PRInt32 _MD_recvfrom(PRFileDesc *fd, void *buf, PRInt32 amount,
@ -1647,7 +1659,7 @@ PRInt32 _MD_recvfrom(PRFileDesc *fd, void *buf, PRInt32 amount,
{
return (SendReceiveDgram(fd, buf, amount, flags, addr, addrlen,
timeout, kDGRAM_RECEIVE));
}
}
PRInt32 _MD_sendto(PRFileDesc *fd,const void *buf, PRInt32 amount,
@ -1656,7 +1668,7 @@ PRInt32 _MD_sendto(PRFileDesc *fd,const void *buf, PRInt32 amount,
{
return (SendReceiveDgram(fd, (void *)buf, amount, flags, addr, &addrlen,
timeout, kDGRAM_SEND));
}
}
PRInt32 _MD_closesocket(PRInt32 osfd)
@ -1683,7 +1695,7 @@ PRInt32 _MD_closesocket(PRInt32 osfd)
ErrorExit:
macsock_map_error(err);
return -1;
}
}
PRInt32 _MD_writev(PRFileDesc *fd, const struct PRIOVec *iov, PRInt32 iov_size, PRIntervalTime timeout)
@ -1693,9 +1705,11 @@ PRInt32 _MD_writev(PRFileDesc *fd, const struct PRIOVec *iov, PRInt32 iov_size,
PR_ASSERT(0);
_PR_MD_CURRENT_THREAD()->md.osErrCode = unimpErr;
return -1;
}
}
// OT endpoint states are documented here:
// http://gemma.apple.com/techpubs/mac/NetworkingOT/NetworkingWOT-27.html#MARKER-9-65
//
static PRBool GetState(PRFileDesc *fd, PRBool *readReady, PRBool *writeReady, PRBool *exceptReady)
{
OTResult resultOT;
@ -1706,14 +1720,32 @@ static PRBool GetState(PRFileDesc *fd, PRBool *readReady, PRBool *writeReady, PR
OTCountDataBytes((EndpointRef)fd->secret->md.osfd, &availableData);
*readReady = fd->secret->md.readReady && (availableData > 0);
*exceptReady = fd->secret->md.exceptReady;
*exceptReady = fd->secret->md.exceptReady;
resultOT = OTGetEndpointState((EndpointRef)fd->secret->md.osfd);
switch (resultOT) {
case T_DATAXFER:
case T_INREL:
*writeReady = PR_TRUE;
switch (resultOT) {
case T_IDLE:
case T_UNBND:
// the socket is not connected. Emulating BSD sockets,
// we mark it readable and writable. The next PR_Read
// or PR_Write will then fail. Usually, in this situation,
// fd->secret->md.exceptReady is also set, and returned if
// anyone is polling for it.
*readReady = PR_FALSE;
*writeReady = PR_FALSE;
break;
case T_DATAXFER: // data transfer
*writeReady = fd->secret->md.writeReady;
break;
case T_INREL: // incoming orderly release
*writeReady = fd->secret->md.writeReady;
break;
case T_OUTCON: // outgoing connection pending
case T_INCON: // incoming connection pending
case T_OUTREL: // outgoing orderly release
default:
*writeReady = PR_FALSE;
}
@ -1811,7 +1843,24 @@ static void SetDescPollThread(PRPollDesc *pds, PRIntn npds, PRThread* thread)
PRFileDesc *bottomFD = PR_GetIdentitiesLayer(pd->fd, PR_NSPR_IO_LAYER);
if (bottomFD && (_PR_FILEDESC_OPEN == bottomFD->secret->state))
{
bottomFD->secret->md.poll.thread = thread;
if (pd->in_flags & PR_POLL_READ) {
PR_ASSERT(thread == NULL || bottomFD->secret->md.read.thread == NULL);
bottomFD->secret->md.read.thread = thread;
}
if (pd->in_flags & PR_POLL_WRITE) {
// it's possible for the writing thread to be non-null during
// a non-blocking connect, so we assert that we're on
// the same thread, or the thread is null.
// Note that it's strictly possible for the connect and poll
// to be on different threads, so ideally we need to assert
// that if md.write.thread is non-null, there is a non-blocking
// connect in progress.
PR_ASSERT(thread == NULL ||
(bottomFD->secret->md.write.thread == NULL ||
bottomFD->secret->md.write.thread == thread));
bottomFD->secret->md.write.thread = thread;
}
}
}
}
@ -1822,9 +1871,8 @@ PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout)
PRThread *thread = _PR_MD_CURRENT_THREAD();
intn is;
PRInt32 ready;
OSErr result;
if (timeout == PR_INTERVAL_NO_WAIT) {
if (timeout == PR_INTERVAL_NO_WAIT) {
return CheckPollDescs(pds, npds);
}
@ -1835,7 +1883,7 @@ PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout)
// need to set up the thread
PrepareForAsyncCompletion(thread, 0);
SetDescPollThread(pds, npds, thread);
SetDescPollThread(pds, npds, thread);
ready = CheckPollDescs(pds, npds);
PR_Unlock(thread->md.asyncIOLock);
@ -1843,13 +1891,8 @@ PRInt32 _MD_poll(PRPollDesc *pds, PRIntn npds, PRIntervalTime timeout)
if (ready == 0) {
WaitOnThisThread(thread, timeout);
result = thread->md.osErrCode;
if (result != noErr && result != kETIMEDOUTErr) {
PR_ASSERT(0); /* debug: catch unexpected errors */
ready = -1;
} else {
ready = CheckPollDescs(pds, npds);
}
ready = CheckPollDescs(pds, npds);
} else {
thread->io_pending = PR_FALSE;
}
@ -1937,7 +1980,7 @@ PR_IMPLEMENT(PRInt32) _MD_shutdown(PRFileDesc *fd, PRIntn how)
/* Just succeed silently!!! */
return (0);
}
}
PR_IMPLEMENT(PRStatus)
@ -1987,7 +2030,7 @@ _MD_getpeername(PRFileDesc *fd, PRNetAddr *addr, PRUint32 *addrlen)
ErrorExit:
macsock_map_error(err);
return PR_FAILURE;
}
}
PR_IMPLEMENT(unsigned long) inet_addr(const char *cp)
@ -1995,7 +2038,7 @@ PR_IMPLEMENT(unsigned long) inet_addr(const char *cp)
OSStatus err;
InetHost host;
_MD_FinishInitNetAccess();
_MD_FinishInitNetAccess();
err = OTInetStringToHost((char*) cp, &host);
if (err != kOTNoError)
@ -2067,7 +2110,7 @@ PR_IMPLEMENT(struct hostent *) gethostbyaddr(const void *addr, int addrlen, int
PR_IMPLEMENT(char *) inet_ntoa(struct in_addr addr)
{
_MD_FinishInitNetAccess();
_MD_FinishInitNetAccess();
OTInetHostToString((InetHost)addr.s_addr, sHostInfo.name);
@ -2080,7 +2123,7 @@ PRStatus _MD_gethostname(char *name, int namelen)
OSStatus err;
InetInterfaceInfo info;
_MD_FinishInitNetAccess();
_MD_FinishInitNetAccess();
/*
* On a Macintosh, we don't have the concept of a local host name.
@ -2164,8 +2207,8 @@ int _MD_mac_get_nonblocking_connect_error(PRInt32 osfd)
case T_IDLE:
return -1;
case T_INREL:
macsock_map_error(ENOTCONN);
return -1;
macsock_map_error(ENOTCONN);
return -1;
default:
PR_ASSERT(0);
return -1;