From 5ebe43461d802177cf2652f482c02c4806ad30cc Mon Sep 17 00:00:00 2001 From: "rpotts%netscape.com" Date: Thu, 3 Jun 1999 05:45:23 +0000 Subject: [PATCH] Added support for testing Sync transport access and Suspending and resuming transports... --- netwerk/test/TestSocketTransport.cpp | 267 ++++++++++++++++++++++----- 1 file changed, 217 insertions(+), 50 deletions(-) diff --git a/netwerk/test/TestSocketTransport.cpp b/netwerk/test/TestSocketTransport.cpp index 0b9ac0b482fa..7446556f24f2 100644 --- a/netwerk/test/TestSocketTransport.cpp +++ b/netwerk/test/TestSocketTransport.cpp @@ -18,6 +18,7 @@ #include #ifdef WIN32 +#define USE_TIMERS // Only use nsITimer on Windows (for now...) #include #endif @@ -32,9 +33,17 @@ #include "nsIInputStream.h" #include "nsIByteBufferInputStream.h" #include "nsIThread.h" +#include "nsITimer.h" #include "nsCRT.h" +// Forward declarations... +class TestConnection; +class InputConsumer; +class OutputObserver; + + + static NS_DEFINE_CID(kSocketTransportServiceCID, NS_SOCKETTRANSPORTSERVICE_CID); static NS_DEFINE_CID(kEventQueueServiceCID, NS_EVENTQUEUESERVICE_CID); static NS_DEFINE_IID(kEventQueueCID, NS_EVENTQUEUE_CID); @@ -43,14 +52,16 @@ static PRTime gElapsedTime; static int gKeepRunning = 1; static nsIEventQueue* gEventQ = nsnull; -class TestConnection; -class InputConsumer; -class OutputObserver; +#define NUM_TEST_THREADS 5 + +static TestConnection* gConnections[NUM_TEST_THREADS]; +static nsIThread* gThreads[NUM_TEST_THREADS]; +static nsITimer* gPeriodicTimer; class TestConnection : public nsIRunnable { public: - TestConnection(const char* aHostName, PRInt32 aPort); + TestConnection(const char* aHostName, PRInt32 aPort, PRBool aAsyncFlag); ~TestConnection(); // nsISupports interface... @@ -65,6 +76,9 @@ public: nsresult Process(void); + nsresult Suspend(void); + nsresult Resume(void); + void Lock(void) { PR_EnterMonitor(mMonitor); } void Notify(void) { PR_Notify(mMonitor); } void Wait(void) { PR_Wait(mMonitor, PR_INTERVAL_NO_TIMEOUT); } @@ -76,12 +90,15 @@ public: protected: nsIByteBufferInputStream* mStream; + nsIInputStream* mInStream; + nsIOutputStream* mOutStream; InputConsumer* mInputConsumer; OutputObserver* mOutputObserver; nsITransport* mTransport; + PRBool mIsAsync; PRInt32 mBufferLength; char mBufferChar; @@ -181,6 +198,9 @@ InputConsumer::OnStopBinding(nsISupports* context, nsIString* aMsg) { printf("\n+++ InputConsumer::OnStopBinding (status = %x) +++. Context = %p\n", aStatus, context); + mConnection->Lock(); + mConnection->Notify(); + mConnection->Unlock(); return NS_OK; } @@ -242,12 +262,12 @@ OutputObserver::OnStopBinding(nsISupports* context, nsresult aStatus, nsIString* aMsg) { - mConnection->Lock(); +/// mConnection->Lock(); printf("\n+++ OutputObserver::OnStopBinding (status = %x) +++. Context = %p\n", aStatus, context); - mConnection->Notify(); +/// mConnection->Notify(); - mConnection->Unlock(); +/// mConnection->Unlock(); return NS_OK; } @@ -260,25 +280,27 @@ OutputObserver::OnStopBinding(nsISupports* context, // // ----- -TestConnection::TestConnection(const char* aHostName, PRInt32 aPort) +TestConnection::TestConnection(const char* aHostName, PRInt32 aPort, PRBool aAsyncFlag) { nsresult rv; NS_INIT_REFCNT(); + mIsAsync = aAsyncFlag; mBufferLength = 255; - mBufferChar = 'a'; + mBufferChar = 'a'; mTransport = nsnull; mStream = nsnull; - mMonitor = nsnull; - // Create the input and output stream observers... - mInputConsumer = new InputConsumer(this); - mOutputObserver = new OutputObserver(this); + mInStream = nsnull; + mOutStream = nsnull; - NS_IF_ADDREF(mInputConsumer); - NS_IF_ADDREF(mOutputObserver); + mInputConsumer = nsnull; + mOutputObserver = nsnull; + + // Create the monitor used for synchronization... + mMonitor = PR_NewMonitor(); // Create a socket transport... NS_WITH_SERVICE(nsISocketTransportService, sts, kSocketTransportServiceCID, &rv); @@ -286,22 +308,41 @@ TestConnection::TestConnection(const char* aHostName, PRInt32 aPort) rv = sts->CreateTransport(aHostName, aPort, &mTransport); } - // Create a stream for the data being written to the server... if (NS_SUCCEEDED(rv)) { - rv = NS_NewByteBufferInputStream(&mStream); - } + if (mIsAsync) { + // Create the input and output stream observers... + mInputConsumer = new InputConsumer(this); + mOutputObserver = new OutputObserver(this); - mMonitor = PR_NewMonitor(); + NS_IF_ADDREF(mInputConsumer); + NS_IF_ADDREF(mOutputObserver); + + // Create a stream for the data being written to the server... + if (NS_SUCCEEDED(rv)) { + rv = NS_NewByteBufferInputStream(&mStream); + } + } + // Synchronous transport... + else { + rv = mTransport->OpenInputStream (&mInStream); + rv = mTransport->OpenOutputStream(&mOutStream); + } + } } TestConnection::~TestConnection() { + NS_IF_RELEASE(mTransport); + // Async resources... NS_IF_RELEASE(mInputConsumer); NS_IF_RELEASE(mOutputObserver); - NS_IF_RELEASE(mTransport); NS_IF_RELEASE(mStream); + // Sync resources... + NS_IF_RELEASE(mInStream); + NS_IF_RELEASE(mOutStream); + if (mMonitor) { PR_DestroyMonitor(mMonitor); } @@ -312,14 +353,37 @@ NS_IMPL_ISUPPORTS(TestConnection,nsIRunnable::GetIID()); NS_IMETHODIMP TestConnection::Run(void) { - nsresult rv = NS_OK; + nsresult rv; + + // + // Make sure that all resources were allocated in the constructor... + // + if (!mTransport) { + rv = NS_ERROR_FAILURE; + } + if (mIsAsync && (!mInputConsumer || !mOutputObserver)) { + rv = NS_ERROR_FAILURE; + } + + if (NS_SUCCEEDED(rv)) { + if (mIsAsync) { + // + // Initiate an async read... + // + rv = mTransport->AsyncRead(mTransport, gEventQ, mInputConsumer); + + if (NS_FAILED(rv)) { + printf("Error: AsyncRead failed..."); + } + } - if (mInputConsumer && mOutputObserver && mTransport && mStream) { Lock(); - ReadBuffer(); - while (1) { - WriteBuffer(); - Wait(); + while (NS_SUCCEEDED(rv)) { + rv = WriteBuffer(); + + if (NS_SUCCEEDED(rv)) { + rv = ReadBuffer(); + } if (mBufferChar == 'z') { mBufferChar = 'a'; @@ -330,6 +394,7 @@ TestConnection::Run(void) Unlock(); } + printf("Transport thread exiting...\n"); return rv; } @@ -338,21 +403,43 @@ nsresult TestConnection::WriteBuffer(void) { nsresult rv; char *buffer; + PRInt32 size; PRUint32 bytesWritten; - int i; // Create and fill a test buffer of data... buffer = (char*)PR_Malloc(mBufferLength + 4); if (buffer) { - for (i=0; iFill(buffer, strlen(buffer), &bytesWritten); + // + // Async case... + // + if (mStream) { + rv = mStream->Fill(buffer, size, &bytesWritten); + + // Write the buffer to the server... + if (NS_SUCCEEDED(rv)) { + rv = mTransport->AsyncWrite(mStream, mTransport, gEventQ, /* mOutputObserver */ nsnull); + } + // Wait for the write to complete... + if (NS_SUCCEEDED(rv)) { + Wait(); + } else { + printf("Error: AsyncWrite failed..."); + } + } + // + // Synchronous case... + // + else if (mOutStream) { + rv = mOutStream->Write(buffer, size, &bytesWritten); + } printf("\n+++ Request is: %c. Context = %p\n", mBufferChar, mTransport); PR_Free(buffer); @@ -360,26 +447,28 @@ nsresult TestConnection::WriteBuffer(void) rv = NS_ERROR_OUT_OF_MEMORY; } - // Write the buffer to the server... - if (NS_SUCCEEDED(rv)) { - rv = mTransport->AsyncWrite(mStream, mTransport, gEventQ, mOutputObserver); - } - if (NS_FAILED(rv)) { - printf("Error: AsyncWrite failed..."); - } - return rv; } nsresult TestConnection::ReadBuffer(void) { - nsresult rv; + nsresult rv = NS_OK; - rv = mTransport->AsyncRead(mTransport, gEventQ, mInputConsumer); + // + // Synchronous case... + // + if (mInStream) { + char *buffer; + PRUint32 bytesRead; - if (NS_FAILED(rv)) { - printf("Error: AsyncRead failed..."); + buffer = (char*)PR_Malloc(mBufferLength + 4); + + if (buffer) { + rv = mInStream->Read(buffer, mBufferLength+2, &bytesRead); + + PR_Free(buffer); + } } return rv; @@ -391,6 +480,74 @@ nsresult TestConnection::Process(void) } +nsresult TestConnection::Suspend(void) +{ + nsresult rv; + + if (mTransport) { + rv = mTransport->Suspend(); + } else { + rv = NS_ERROR_FAILURE; + } + + return rv; +} + +nsresult TestConnection::Resume(void) +{ + nsresult rv; + + if (mTransport) { + rv = mTransport->Resume(); + } else { + rv = NS_ERROR_FAILURE; + } + + return rv; +} + + + + + + +#if defined(USE_TIMERS) + +void TimerCallback(nsITimer* aTimer, void* aClosure) +{ + static PRBool flag = PR_FALSE; + int i; + + if (flag) { + printf("Resuming connections...\n"); + } else { + printf("Suspending connections...\n"); + } + + + for (i=0; iResume(); + } else { + connection->Suspend(); + } + } + } + flag = !flag; + + NS_RELEASE(gPeriodicTimer); + + if (NS_OK == NS_NewTimer(&gPeriodicTimer)) { + gPeriodicTimer->Init(TimerCallback, nsnull, 1000*5); + } +} + +#endif /* USE_TIMERS */ + int main(int argc, char* argv[]) @@ -430,16 +587,26 @@ main(int argc, char* argv[]) eventQService->GetThreadEventQueue(PR_CurrentThread(), &gEventQ); - TestConnection* connections[5]; - nsIThread* threads[5]; int i; - for (i=0; i<5; i++) { - connections[i] = new TestConnection("chainsaw", 7); - rv = NS_NewThread(&threads[i], connections[i]); + // + // Create the connections and threads... + // + for (i=0; iInit(TimerCallback, nsnull, 1000); + } +#endif /* USE_TIMERS */ + // Enter the message pump to allow the URL load to proceed. while ( gKeepRunning ) { #ifdef WIN32