diff --git a/netwerk/test/httpserver/httpd.js b/netwerk/test/httpserver/httpd.js index 7446519c4c72..e4562b78f1cb 100644 --- a/netwerk/test/httpserver/httpd.js +++ b/netwerk/test/httpserver/httpd.js @@ -1,4 +1,4 @@ -/* -*- Mode: Java; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* -*- Mode: JavaScript; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim:set ts=2 sw=2 sts=2 et: */ /* ***** BEGIN LICENSE BLOCK ***** * Version: MPL 1.1/GPL 2.0/LGPL 2.1 @@ -227,12 +227,6 @@ function getRootPrefBranch() const ServerSocket = CC("@mozilla.org/network/server-socket;1", "nsIServerSocket", "init"); -const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", - "nsIBinaryInputStream", - "setInputStream"); -const BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1", - "nsIBinaryOutputStream", - "setOutputStream"); const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1", "nsIScriptableInputStream", "init"); @@ -250,6 +244,13 @@ const WritablePropertyBag = CC("@mozilla.org/hash-property-bag;1", const SupportsString = CC("@mozilla.org/supports-string;1", "nsISupportsString"); +/* These two are non-const only so a test can overwrite them. */ +var BinaryInputStream = CC("@mozilla.org/binaryinputstream;1", + "nsIBinaryInputStream", + "setInputStream"); +var BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1", + "nsIBinaryOutputStream", + "setOutputStream"); /** * Returns the RFC 822/1123 representation of a date. @@ -423,7 +424,7 @@ nsHttpServer.prototype = { var input = trans.openInputStream(0, SEGMENT_SIZE, SEGMENT_COUNT) .QueryInterface(Ci.nsIAsyncInputStream); - var output = trans.openOutputStream(Ci.nsITransport.OPEN_BLOCKING, 0, 0); + var output = trans.openOutputStream(0, 0, 0); } catch (e) { @@ -1289,7 +1290,7 @@ RequestReader.prototype = } catch (e) { - if (e.result !== Cr.NS_BASE_STREAM_CLOSED) + if (streamClosed(e)) { dumpn("*** WARNING: unexpected error when reading from socket; will " + "be treated as if the input stream had been closed"); @@ -3401,8 +3402,10 @@ function Response(connection) this._bodyInputStream = null; /** - * The stream copier which copies data written to the body by a request - * handler to the network. + * A stream copier which copies data to the network. It is initially null + * until replaced with a copier for response headers; when headers have been + * fully sent it is replaced with a copier for the response body, remaining + * so for the duration of response processing. */ this._asyncCopier = null; @@ -3440,7 +3443,7 @@ Response.prototype = if (!this._bodyOutputStream) { - var pipe = new Pipe(false, false, Response.SEGMENT_SIZE, PR_UINT32_MAX, + var pipe = new Pipe(true, false, Response.SEGMENT_SIZE, PR_UINT32_MAX, null); this._bodyOutputStream = pipe.outputStream; this._bodyInputStream = pipe.inputStream; @@ -3740,8 +3743,8 @@ Response.prototype = // the connection will be deterministically written. This makes it easier // to specify exact behavior, and it makes observable behavior more // predictable for clients. Note that the correctness of this depends on - // callbacks in response to _waitForData in WriteThroughCopier happening - // asynchronously with respect to the actual writing of data to + // callbacks in response to _waitToReadData in WriteThroughCopier + // happening asynchronously with respect to the actual writing of data to // bodyOutputStream, as they currently do; if they happened synchronously, // an event which ran before this one could write more data to the // response body before we get around to canceling the copier. We have @@ -3781,6 +3784,37 @@ Response.prototype = // PRIVATE IMPLEMENTATION + /** + * Sends the status line and headers of this response if they haven't been + * sent and initiates the process of copying data written to this response's + * body to the network. + */ + _startAsyncProcessor: function() + { + dumpn("*** _startAsyncProcessor()"); + + // Handle cases where we're being called a second time. The former case + // happens when this is triggered both by complete() and by processAsync(), + // while the latter happens when processAsync() in conjunction with sent + // data causes abort() to be called. + if (this._asyncCopier || this._ended) + { + dumpn("*** ignoring second call to _startAsyncProcessor"); + return; + } + + // Send headers if they haven't been sent already and should be sent, then + // asynchronously continue to send the body. + if (this._headers && !this._powerSeized) + { + this._sendHeaders(); + return; + } + + this._headers = null; + this._sendBody(); + }, + /** * Signals that all modifications to the response status line and headers are * complete and then sends that data over the network to the client. Once @@ -3830,7 +3864,7 @@ Response.prototype = dumpn("*** header post-processing completed, sending response head..."); // request-line - var preamble = statusLine; + var preambleData = [statusLine]; // headers var headEnum = headers.enumerator; @@ -3841,71 +3875,82 @@ Response.prototype = .data; var values = headers.getHeaderValues(fieldName); for (var i = 0, sz = values.length; i < sz; i++) - preamble += fieldName + ": " + values[i] + "\r\n"; + preambleData.push(fieldName + ": " + values[i] + "\r\n"); } // end request-line/headers - preamble += "\r\n"; + preambleData.push("\r\n"); - var connection = this._connection; - try - { - connection.output.write(preamble, preamble.length); - } - catch (e) - { - // Connection closed already? Even if not, failure to write the response - // means we probably will fail later anyway, so in the interests of - // avoiding exceptions we'll (possibly) close the connection and return. - dumpn("*** error writing headers to socket: " + e); - response.end(); - return; - } + var preamble = preambleData.join(""); + + var responseHeadPipe = new Pipe(true, false, 0, PR_UINT32_MAX, null); + responseHeadPipe.outputStream.write(preamble, preamble.length); + + var response = this; + var copyObserver = + { + onStartRequest: function(request, cx) + { + dumpn("*** preamble copying started"); + }, + + onStopRequest: function(request, cx, statusCode) + { + dumpn("*** preamble copying complete " + + "[status=0x" + statusCode.toString(16) + "]"); + + if (!Components.isSuccessCode(statusCode)) + { + dumpn("!!! header copying problems: non-success statusCode, " + + "ending response"); + + response.end(); + } + else + { + response._sendBody(); + } + }, + + QueryInterface: function(aIID) + { + if (aIID.equals(Ci.nsIRequestObserver) || aIID.equals(Ci.nsISupports)) + return this; + + throw Cr.NS_ERROR_NO_INTERFACE; + } + }; + + var headerCopier = this._asyncCopier = + new WriteThroughCopier(responseHeadPipe.inputStream, + this._connection.output, + copyObserver, null); + + responseHeadPipe.outputStream.close(); // Forbid setting any more headers or modifying the request line. this._headers = null; }, /** - * Sends the status line and headers of this response if they haven't been - * sent and initiates the process of copying data written to this response's - * body to the network. + * Asynchronously writes the body of the response (or the entire response, if + * seizePower() has been called) to the network. */ - _startAsyncProcessor: function() + _sendBody: function() { - dumpn("*** _startAsyncProcessor()"); + dumpn("*** _sendBody"); - // Handle cases where we're being called a second time. The former case - // happens when this is triggered both by complete() and by processAsync(), - // while the latter happens when processAsync() in conjunction with sent - // data causes abort() to be called. - if (this._asyncCopier || this._ended) - { - dumpn("*** ignoring second call to _startAsyncProcessor"); - return; - } - - // Send headers if they haven't been sent already. - if (this._headers) - { - if (this._powerSeized) - this._headers = null; - else - this._sendHeaders(); - NS_ASSERT(this._headers === null, "_sendHeaders() failed?"); - } - - var response = this; - var connection = this._connection; + NS_ASSERT(!this._headers, "still have headers around but sending body?"); // If no body data was written, we're done if (!this._bodyInputStream) { dumpn("*** empty body, response finished"); - response.end(); + this.end(); return; } + var response = this; var copyObserver = { onStartRequest: function(request, context) @@ -3932,8 +3977,7 @@ Response.prototype = QueryInterface: function(aIID) { - if (aIID.equals(Ci.nsIRequestObserver) || - aIID.equals(Ci.nsISupports)) + if (aIID.equals(Ci.nsIRequestObserver) || aIID.equals(Ci.nsISupports)) return this; throw Cr.NS_ERROR_NO_INTERFACE; @@ -3941,7 +3985,7 @@ Response.prototype = }; dumpn("*** starting async copier of body data..."); - var copier = this._asyncCopier = + this._asyncCopier = new WriteThroughCopier(this._bodyInputStream, this._connection.output, copyObserver, null); }, @@ -3965,30 +4009,45 @@ function notImplemented() throw Cr.NS_ERROR_NOT_IMPLEMENTED; } +/** Returns true iff the given exception represents stream closure. */ +function streamClosed(e) +{ + return e === Cr.NS_BASE_STREAM_CLOSED || + (typeof e === "object" && e.result === Cr.NS_BASE_STREAM_CLOSED); +} + +/** Returns true iff the given exception represents a blocked stream. */ +function wouldBlock(e) +{ + return e === Cr.NS_BASE_STREAM_WOULD_BLOCK || + (typeof e === "object" && e.result === Cr.NS_BASE_STREAM_WOULD_BLOCK); +} + /** - * Copies data from input to output as it becomes available. + * Copies data from source to sink as it becomes available, when that data can + * be written to sink without blocking. * - * @param input : nsIAsyncInputStream + * @param source : nsIAsyncInputStream * the stream from which data is to be read - * @param output : nsIOutputStream + * @param sink : nsIAsyncOutputStream * the stream to which data is to be copied * @param observer : nsIRequestObserver * an observer which will be notified when the copy starts and finishes * @param context : nsISupports * context passed to observer when notified of start/stop * @throws NS_ERROR_NULL_POINTER - * if input, output, or observer are null + * if source, sink, or observer are null */ -function WriteThroughCopier(input, output, observer, context) +function WriteThroughCopier(source, sink, observer, context) { - if (!input || !output || !observer) + if (!source || !sink || !observer) throw Cr.NS_ERROR_NULL_POINTER; /** Stream from which data is being read. */ - this._input = input; + this._source = source; /** Stream to which data is being written. */ - this._output = new BinaryOutputStream(output); + this._sink = sink; /** Observer watching this copy. */ this._observer = observer; @@ -3996,7 +4055,16 @@ function WriteThroughCopier(input, output, observer, context) /** Context for the observer watching this. */ this._context = context; - /** False until cancel() is called, when this copy is completed. */ + /** + * True iff this is currently being canceled (cancel has been called, the + * callback may not yet have been made). + */ + this._canceled = false; + + /** + * False until all data has been read from input and written to output, at + * which point this copy is completed and cancel() is asynchronously called. + */ this._completed = false; /** Required by nsIRequest, meaningless. */ @@ -4009,64 +4077,303 @@ function WriteThroughCopier(input, output, observer, context) /** Status of this request. */ this.status = Cr.NS_OK; + /** Arrays of byte strings waiting to be written to output. */ + this._pendingData = []; + // start copying try { observer.onStartRequest(this, context); - this._waitForData(); + this._waitToReadData(); + this._waitForSinkClosure(); } catch (e) { - dumpn("!!! error starting copy: " + e); + dumpn("!!! error starting copy: " + e + + ("lineNumber" in e ? ", line " + e.lineNumber : "")); + dumpn(e.stack); this.cancel(Cr.NS_ERROR_UNEXPECTED); } } WriteThroughCopier.prototype = { - /** - * Cancels data copying and asynchronously notifies the observer with the - * given error code. - * - * @param status : nsresult - * the status to pass to the observer when data copying has been canceled - */ - cancel: function(status) - { - dumpn("*** cancel(" + status.toString(16) + ")"); + /* nsISupports implementation */ - if (this._completed) + QueryInterface: function(iid) + { + if (iid.equals(Ci.nsIInputStreamCallback) || + iid.equals(Ci.nsIOutputStreamCallback) || + iid.equals(Ci.nsIRequest) || + iid.equals(Ci.nsISupports)) { - dumpn("*** ignoring cancel on already-canceled copier..."); + return this; + } + + throw Cr.NS_ERROR_NO_INTERFACE; + }, + + + // NSIINPUTSTREAMCALLBACK + + /** + * Receives a more-data-in-input notification and writes the corresponding + * data to the output. + * + * @param input : nsIAsyncInputStream + * the input stream on whose data we have been waiting + */ + onInputStreamReady: function(input) + { + if (this._source === null) + return; + + dumpn("*** onInputStreamReady"); + + // + // Ordinarily we'll read a non-zero amount of data from input, queue it up + // to be written and then wait for further callbacks. The complications in + // this method are the cases where we deviate from that behavior when errors + // occur or when copying is drawing to a finish. + // + // The edge cases when reading data are: + // + // Zero data is read + // If zero data was read, we're at the end of available data, so we can + // should stop reading and move on to writing out what we have (or, if + // we've already done that, onto notifying of completion). + // A stream-closed exception is thrown + // This is effectively a less kind version of zero data being read; the + // only difference is that we notify of completion with that result + // rather than with NS_OK. + // Some other exception is thrown + // This is the least kind result. We don't know what happened, so we + // act as though the stream closed except that we notify of completion + // with the result NS_ERROR_UNEXPECTED. + // + + var bytesWanted = 0, bytesConsumed = -1; + try + { + input = new BinaryInputStream(input); + + bytesWanted = Math.min(input.available(), Response.SEGMENT_SIZE); + dumpn("*** input wanted: " + bytesWanted); + + if (bytesWanted > 0) + { + var data = input.readByteArray(bytesWanted); + bytesConsumed = data.length; + this._pendingData.push(String.fromCharCode.apply(String, data)); + } + + dumpn("*** " + bytesConsumed + " bytes read"); + + // Handle the zero-data edge case in the same place as all other edge + // cases are handled. + if (bytesWanted === 0) + throw Cr.NS_BASE_STREAM_CLOSED; + } + catch (e) + { + if (streamClosed(e)) + { + dumpn("*** input stream closed"); + e = bytesWanted === 0 ? Cr.NS_OK : Cr.NS_ERROR_UNEXPECTED; + } + else + { + dumpn("!!! unexpected error reading from input, canceling: " + e); + e = Cr.NS_ERROR_UNEXPECTED; + } + + this._doneReadingSource(e); return; } - this._completed = true; - this.status = status; + var pendingData = this._pendingData; - var self = this; - var cancelEvent = - { - run: function() - { - dumpn("*** onStopRequest async callback"); - try - { - self._observer.onStopRequest(self, self._context, self.status); - } - catch (e) - { - NS_ASSERT(false, "how are we throwing an exception here? " + e); - } - } - }; - gThreadManager.currentThread - .dispatch(cancelEvent, Ci.nsIThread.DISPATCH_NORMAL); + NS_ASSERT(bytesConsumed > 0); + NS_ASSERT(pendingData.length > 0, "no pending data somehow?"); + NS_ASSERT(pendingData[pendingData.length - 1].length > 0, + "buffered zero bytes of data?"); + + NS_ASSERT(this._source !== null); + + // Reading has gone great, and we've gotten data to write now. What if we + // don't have a place to write that data, because output went away just + // before this read? Drop everything on the floor, including new data, and + // cancel at this point. + if (this._sink === null) + { + pendingData.length = 0; + this._doneReadingSource(Cr.NS_ERROR_UNEXPECTED); + return; + } + + // Okay, we've read the data, and we know we have a place to write it. We + // need to queue up the data to be written, but *only* if none is queued + // already -- if data's already queued, the code that actually writes the + // data will make sure to wait on unconsumed pending data. + try + { + if (pendingData.length === 1) + this._waitToWriteData(); + } + catch (e) + { + dumpn("!!! error waiting to write data just read, swallowing and " + + "writing only what we already have: " + e); + this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED); + return; + } + + // Whee! We successfully read some data, and it's successfully queued up to + // be written. All that remains now is to wait for more data to read. + try + { + this._waitToReadData(); + } + catch (e) + { + dumpn("!!! error waiting to read more data: " + e); + this._doneReadingSource(Cr.NS_ERROR_UNEXPECTED); + } }, + + // NSIOUTPUTSTREAMCALLBACK + /** - * Returns true if the provided input hasn't been fully consumed and cancel() - * hasn't been called. + * Callback when data may be written to the output stream without blocking, or + * when the output stream has been closed. + * + * @param output : nsIAsyncOutputStream + * the output stream on whose writability we've been waiting, also known as + * this._sink */ + onOutputStreamReady: function(output) + { + if (this._sink === null) + return; + + dumpn("*** onOutputStreamReady"); + + var pendingData = this._pendingData; + if (pendingData.length === 0) + { + // There's no pending data to write. The only way this can happen is if + // we're waiting on the output stream's closure, so we can respond to a + // copying failure as quickly as possible (rather than waiting for data to + // be available to read and then fail to be copied). Therefore, we must + // be done now -- don't bother to attempt to write anything and wrap + // things up. + dumpn("!!! output stream closed prematurely, ending copy"); + + this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED); + return; + } + + + NS_ASSERT(pendingData[0].length > 0, "queued up an empty quantum?"); + + // + // Write out the first pending quantum of data. The possible errors here + // are: + // + // The write might fail because we can't write that much data + // Okay, we've written what we can now, so re-queue what's left and + // finish writing it out later. + // The write failed because the stream was closed + // Discard pending data that we can no longer write, stop reading, and + // signal that copying finished. + // Some other error occurred. + // Same as if the stream were closed, but notify with the status + // NS_ERROR_UNEXPECTED so the observer knows something was wonky. + // + + try + { + var quantum = pendingData[0]; + + // XXX |quantum| isn't guaranteed to be ASCII, so we're relying on + // undefined behavior! We're only using this because writeByteArray + // is unusably broken for asynchronous output streams; see bug 532834 + // for details. + var bytesWritten = output.write(quantum, quantum.length); + if (bytesWritten === quantum.length) + pendingData.shift(); + else + pendingData[0] = quantum.substring(bytesWritten); + + dumpn("*** wrote " + bytesWritten + " bytes of data"); + } + catch (e) + { + if (wouldBlock(e)) + { + NS_ASSERT(pendingData.length > 0, + "stream-blocking exception with no data to write?"); + NS_ASSERT(pendingData[0].length > 0, + "stream-blocking exception with empty quantum?"); + this._waitToWriteData(); + return; + } + + if (streamClosed(e)) + dumpn("!!! output stream prematurely closed, signaling error..."); + else + dumpn("!!! unknown error: " + e + ", quantum=" + quantum); + + this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED); + return; + } + + // The day is ours! Quantum written, now let's see if we have more data + // still to write. + try + { + if (pendingData.length > 0) + { + this._waitToWriteData(); + return; + } + } + catch (e) + { + dumpn("!!! unexpected error waiting to write pending data: " + e); + this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED); + return; + } + + // Okay, we have no more pending data to write -- but might we get more in + // the future? + if (this._source !== null) + { + /* + * If we might, then wait for the output stream to be closed. (We wait + * only for closure because we have no data to write -- and if we waited + * for a specific amount of data, we would get repeatedly notified for no + * reason if over time the output stream permitted more and more data to + * be written to it without blocking.) + */ + this._waitForSinkClosure(); + } + else + { + /* + * On the other hand, if we can't have more data because the input + * stream's gone away, then it's time to notify of copy completion. + * Victory! + */ + this._sink = null; + this._cancelOrDispatchCancelCallback(Cr.NS_OK); + } + }, + + + // NSIREQUEST + + /** Returns true if the cancel observer hasn't been notified yet. */ isPending: function() { return !this._completed; @@ -4078,66 +4385,191 @@ WriteThroughCopier.prototype = resume: notImplemented, /** - * Receives a more-data-in-input notification and writes the corresponding - * data to the output. + * Cancels data reading from input, asynchronously writes out any pending + * data, and causes the observer to be notified with the given error code when + * all writing has finished. + * + * @param status : nsresult + * the status to pass to the observer when data copying has been canceled */ - onInputStreamReady: function(input) + cancel: function(status) { - dumpn("*** onInputStreamReady"); - if (this._completed) + dumpn("*** cancel(" + status.toString(16) + ")"); + + if (this._canceled) { - dumpn("*** ignoring stream-ready callback on a canceled copier..."); + dumpn("*** suppressing a late cancel"); return; } - input = new BinaryInputStream(input); - try - { - var avail = input.available(); - var data = input.readByteArray(avail); - this._output.writeByteArray(data, data.length); - } - catch (e) - { - if (e === Cr.NS_BASE_STREAM_CLOSED || - e.result === Cr.NS_BASE_STREAM_CLOSED) - { - this.cancel(Cr.NS_OK); - } - else - { - dumpn("!!! error copying from input to output: " + e); - this.cancel(Cr.NS_ERROR_UNEXPECTED); - } - return; - } + this._canceled = true; + this.status = status; - if (avail === 0) - this.cancel(Cr.NS_OK); + // We could be in the middle of absolutely anything at this point. Both + // input and output might still be around, we might have pending data to + // write, and in general we know nothing about the state of the world. We + // therefore must assume everything's in progress and take everything to its + // final steady state (or so far as it can go before we need to finish + // writing out remaining data). + + this._doneReadingSource(status); + }, + + + // PRIVATE IMPLEMENTATION + + /** + * Stop reading input if we haven't already done so, passing e as the status + * when closing the stream, and kick off a copy-completion notice if no more + * data remains to be written. + * + * @param e : nsresult + * the status to be used when closing the input stream + */ + _doneReadingSource: function(e) + { + dumpn("*** _doneReadingSource(0x" + e.toString(16) + ")"); + + this._finishSource(e); + if (this._pendingData.length === 0) + this._sink = null; else - this._waitForData(); + NS_ASSERT(this._sink !== null, "null output?"); + + // If we've written out all data read up to this point, then it's time to + // signal completion. + if (this._sink === null) + { + NS_ASSERT(this._pendingData.length === 0, "pending data still?"); + this._cancelOrDispatchCancelCallback(e); + } + }, + + /** + * Stop writing output if we haven't already done so, discard any data that + * remained to be sent, close off input if it wasn't already closed, and kick + * off a copy-completion notice. + * + * @param e : nsresult + * the status to be used when closing input if it wasn't already closed + */ + _doneWritingToSink: function(e) + { + dumpn("*** _doneWritingToSink(0x" + e.toString(16) + ")"); + + this._pendingData.length = 0; + this._sink = null; + this._doneReadingSource(e); + }, + + /** + * Completes processing of this copy: either by canceling the copy if it + * hasn't already been canceled using the provided status, or by dispatching + * the cancel callback event (with the originally provided status, of course) + * if it already has been canceled. + * + * @param status : nsresult + * the status code to use to cancel this, if this hasn't already been + * canceled + */ + _cancelOrDispatchCancelCallback: function(status) + { + dumpn("*** _cancelOrDispatchCancelCallback(" + status + ")"); + + NS_ASSERT(this._source === null, "should have finished input"); + NS_ASSERT(this._sink === null, "should have finished output"); + NS_ASSERT(this._pendingData.length === 0, "should have no pending data"); + + if (!this._canceled) + { + this.cancel(status); + return; + } + + var self = this; + var event = + { + run: function() + { + dumpn("*** onStopRequest async callback"); + + self._completed = true; + try + { + self._observer.onStopRequest(self, self._context, self.status); + } + catch (e) + { + NS_ASSERT(false, + "how are we throwing an exception here? we control " + + "all the callers! " + e); + } + } + }; + + gThreadManager.currentThread.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); }, /** * Kicks off another wait for more data to be available from the input stream. */ - _waitForData: function() + _waitToReadData: function() { - dumpn("*** _waitForData"); - this._input.asyncWait(this, 0, 1, gThreadManager.mainThread); + dumpn("*** _waitToReadData"); + this._source.asyncWait(this, 0, Response.SEGMENT_SIZE, + gThreadManager.mainThread); }, - /** nsISupports implementation */ - QueryInterface: function(iid) + /** + * Kicks off another wait until data can be written to the output stream. + */ + _waitToWriteData: function() { - if (iid.equals(Ci.nsIRequest) || - iid.equals(Ci.nsISupports) || - iid.equals(Ci.nsIInputStreamCallback)) - { - return this; - } + dumpn("*** _waitToWriteData"); - throw Cr.NS_ERROR_NO_INTERFACE; + var pendingData = this._pendingData; + NS_ASSERT(pendingData.length > 0, "no pending data to write?"); + NS_ASSERT(pendingData[0].length > 0, "buffered an empty write?"); + + this._sink.asyncWait(this, 0, pendingData[0].length, + gThreadManager.mainThread); + }, + + /** + * Kicks off a wait for the sink to which data is being copied to be closed. + * We wait for stream closure when we don't have any data to be copied, rather + * than waiting to write a specific amount of data. We can't wait to write + * data because the sink might be infinitely writable, and if no data appears + * in the source for a long time we might have to spin quite a bit waiting to + * write, waiting to write again, &c. Waiting on stream closure instead means + * we'll get just one notification if the sink dies. Note that when data + * starts arriving from the sink we'll resume waiting for data to be written, + * dropping this closure-only callback entirely. + */ + _waitForSinkClosure: function() + { + dumpn("*** _waitForSinkClosure"); + + this._sink.asyncWait(this, Ci.nsIAsyncOutputStream.WAIT_CLOSURE_ONLY, 0, + gThreadManager.mainThread); + }, + + /** + * Closes input with the given status, if it hasn't already been closed; + * otherwise a no-op. + * + * @param status : nsresult + * status code use to close the source stream if necessary + */ + _finishSource: function(status) + { + dumpn("*** _finishSource(" + status.toString(16) + ")"); + + if (this._source !== null) + { + this._source.closeWithStatus(status); + this._source = null; + } } }; diff --git a/netwerk/test/httpserver/nsIHttpServer.idl b/netwerk/test/httpserver/nsIHttpServer.idl index 8b6c007cf105..08a7396376c7 100644 --- a/netwerk/test/httpserver/nsIHttpServer.idl +++ b/netwerk/test/httpserver/nsIHttpServer.idl @@ -362,28 +362,29 @@ interface nsIHttpServerIdentity : nsISupports interface nsIHttpRequestHandler : nsISupports { /** - * Processes the HTTP request represented by metadata and initializes the - * passed-in response to reflect the correct HTTP response. + * Processes an HTTP request and initializes the passed-in response to reflect + * the correct HTTP response. * * If this method throws an exception, externally observable behavior depends - * upon whether is being processed asynchronously and the connection has had - * any data written to it (even an explicit zero bytes of data being written) - * or whether seizePower() has been called on it. If such has happened, sent - * data will be exactly that data written at the time the exception was - * thrown. If no data has been written, the response has not had seizePower() - * called on it, and it is not being asynchronously created, an error handler - * will be invoked (usually 500 unless otherwise specified). Note that some - * uses of nsIHttpRequestHandler may require this method to never throw an - * exception; in the general case, however, this method may throw an exception - * (causing an HTTP 500 response to occur). + * upon whether is being processed asynchronously. If such is the case, the + * output is some prefix (perhaps all, perhaps none, perhaps only some) of the + * data which would have been sent if, instead, the response had been finished + * at that point. If no data has been written, the response has not had + * seizePower() called on it, and it is not being asynchronously created, an + * error handler will be invoked (usually 500 unless otherwise specified). * - * @param metadata + * Some uses of nsIHttpRequestHandler may require this method to never throw + * an exception; in the general case, however, this method may throw an + * exception (causing an HTTP 500 response to occur, if the above conditions + * are met). + * + * @param request * data representing an HTTP request * @param response * an initially-empty response which must be modified to reflect the data * which should be sent as the response to the request described by metadata */ - void handle(in nsIHttpRequest metadata, in nsIHttpResponse response); + void handle(in nsIHttpRequest request, in nsIHttpResponse response); }; @@ -556,12 +557,6 @@ interface nsIHttpResponse : nsISupports * both in the server socket and in the client may delay written data; be * prepared for delays at any time. * - * @note - * Although in the asynchronous cases writes to the underlying transport - * are not buffered, care must still be taken not to block for too long on - * any such writes; it is even possible for deadlock to occur in the case - * that the server and the client reside in the same process. Write data in - * small chunks if necessary to avoid this problem. * @throws NS_ERROR_NOT_AVAILABLE * if accessed after this response is fully constructed */ diff --git a/netwerk/test/httpserver/test/test_async_response_sending.js b/netwerk/test/httpserver/test/test_async_response_sending.js new file mode 100644 index 000000000000..4b90811e2e71 --- /dev/null +++ b/netwerk/test/httpserver/test/test_async_response_sending.js @@ -0,0 +1,1716 @@ +/* -*- Mode: JavaScript; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim:set ts=2 sw=2 sts=2 et: */ +/* ***** BEGIN LICENSE BLOCK ***** + * Version: MPL 1.1/GPL 2.0/LGPL 2.1 + * + * The contents of this file are subject to the Mozilla Public License Version + * 1.1 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.mozilla.org/MPL/ + * + * Software distributed under the License is distributed on an "AS IS" basis, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License + * for the specific language governing rights and limitations under the + * License. + * + * The Original Code is httpd.js code. + * + * The Initial Developer of the Original Code is + * Mozilla Corporation. + * Portions created by the Initial Developer are Copyright (C) 2009 + * the Initial Developer. All Rights Reserved. + * + * Contributor(s): + * Jeff Walden (original author) + * + * Alternatively, the contents of this file may be used under the terms of + * either the GNU General Public License Version 2 or later (the "GPL"), or + * the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), + * in which case the provisions of the GPL or the LGPL are applicable instead + * of those above. If you wish to allow use of your version of this file only + * under the terms of either the GPL or the LGPL, and not to allow others to + * use your version of this file under the terms of the MPL, indicate your + * decision by deleting the provisions above and replace them with the notice + * and other provisions required by the GPL or the LGPL. If you do not delete + * the provisions above, a recipient may use your version of this file under + * the terms of any one of the MPL, the GPL or the LGPL. + * + * ***** END LICENSE BLOCK ***** */ + +/* + * Ensures that data a request handler writes out in response is sent only as + * quickly as the client can receive it, without racing ahead and being forced + * to block while writing that data. + * + * NB: These tests are extremely tied to the current implementation, in terms of + * when and how stream-ready notifications occur, the amount of data which will + * be read or written at each notification, and so on. If the implementation + * changes in any way with respect to stream copying, this test will probably + * have to change a little at the edges as well. + */ + +gThreadManager = Cc["@mozilla.org/thread-manager;1"].createInstance(); + +function run_test() +{ + do_test_pending(); + tests.push(function testsComplete(_) + { + dumpn("******************\n" + + "* TESTS COMPLETE *\n" + + "******************"); + do_test_finished(); + }); + + runNextTest(); +} + +function runNextTest() +{ + testIndex++; + dumpn("*** runNextTest(), testIndex: " + testIndex); + + try + { + var test = tests[testIndex]; + test(runNextTest); + } + catch (e) + { + var msg = "exception running test " + testIndex + ": " + e; + if (e && "stack" in e) + msg += "\nstack follows:\n" + e.stack; + do_throw(msg); + } +} + + +/************* + * TEST DATA * + *************/ + +const NOTHING = []; + +const FIRST_SEGMENT = [1, 2, 3, 4]; +const SECOND_SEGMENT = [5, 6, 7, 8]; +const THIRD_SEGMENT = [9, 10, 11, 12]; + +const SEGMENT = FIRST_SEGMENT; +const TWO_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8]; +const THREE_SEGMENTS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]; + +const SEGMENT_AND_HALF = [1, 2, 3, 4, 5, 6]; + +const QUARTER_SEGMENT = [1]; +const HALF_SEGMENT = [1, 2]; +const SECOND_HALF_SEGMENT = [3, 4]; +const THREE_QUARTER_SEGMENT = [1, 2, 3]; +const EXTRA_HALF_SEGMENT = [5, 6]; +const MIDDLE_HALF_SEGMENT = [2, 3]; +const LAST_QUARTER_SEGMENT = [4]; +const FOURTH_HALF_SEGMENT = [7, 8]; +const HALF_THIRD_SEGMENT = [9, 10]; +const LATTER_HALF_THIRD_SEGMENT = [11, 12]; + +const TWO_HALF_SEGMENTS = [1, 2, 1, 2]; + + +/********* + * TESTS * + *********/ + +var tests = + [ + sourceClosedWithoutWrite, + writeOneSegmentThenClose, + simpleWriteThenRead, + writeLittleBeforeReading, + writeMultipleSegmentsThenRead, + writeLotsBeforeReading, + writeLotsBeforeReading2, + writeThenReadPartial, + manyPartialWrites, + partialRead, + partialWrite, + sinkClosedImmediately, + sinkClosedWithReadableData, + sinkClosedAfterWrite, + sourceAndSinkClosed, + sinkAndSourceClosed, + sourceAndSinkClosedWithPendingData, + sinkAndSourceClosedWithPendingData, + ]; +var testIndex = -1; + +function sourceClosedWithoutWrite(next) +{ + var t = new CopyTest("sourceClosedWithoutWrite", next); + + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [NOTHING]); +} + +function writeOneSegmentThenClose(next) +{ + var t = new CopyTest("writeLittleBeforeReading", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT]); +} + +function simpleWriteThenRead(next) +{ + var t = new CopyTest("simpleWriteThenRead", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [SEGMENT]); +} + +function writeLittleBeforeReading(next) +{ + var t = new CopyTest("writeLittleBeforeReading", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT, SEGMENT]); +} + +function writeMultipleSegmentsThenRead(next) +{ + var t = new CopyTest("writeMultipleSegmentsThenRead", next); + + t.addToSource(TWO_SEGMENTS); + t.makeSourceReadable(TWO_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(TWO_SEGMENTS.length, + [FIRST_SEGMENT, SECOND_SEGMENT]); + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [TWO_SEGMENTS]); +} + +function writeLotsBeforeReading(next) +{ + var t = new CopyTest("writeLotsBeforeReading", next); + + t.addToSource(TWO_SEGMENTS); + t.makeSourceReadable(TWO_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]); + t.expect(Cr.NS_OK, [TWO_SEGMENTS, SEGMENT, SEGMENT]); +} + +function writeLotsBeforeReading2(next) +{ + var t = new CopyTest("writeLotsBeforeReading", next); + + t.addToSource(THREE_SEGMENTS); + t.makeSourceReadable(THREE_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(SECOND_SEGMENT.length, [SECOND_SEGMENT]); + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableAndWaitFor(THIRD_SEGMENT.length, [THIRD_SEGMENT]); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(2 * SEGMENT.length, [SEGMENT, SEGMENT]); + t.expect(Cr.NS_OK, [THREE_SEGMENTS, SEGMENT, SEGMENT]); +} + +function writeThenReadPartial(next) +{ + var t = new CopyTest("writeThenReadPartial", next); + + t.addToSource(SEGMENT_AND_HALF); + t.makeSourceReadable(SEGMENT_AND_HALF.length); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.closeSource(Cr.NS_OK); + t.makeSinkWritableAndWaitFor(EXTRA_HALF_SEGMENT.length, [EXTRA_HALF_SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT_AND_HALF]); +} + +function manyPartialWrites(next) +{ + var t = new CopyTest("manyPartialWrites", next); + + t.addToSource(HALF_SEGMENT); + t.makeSourceReadable(HALF_SEGMENT.length); + + t.addToSource(HALF_SEGMENT); + t.makeSourceReadable(HALF_SEGMENT.length); + t.makeSinkWritableAndWaitFor(2 * HALF_SEGMENT.length, [TWO_HALF_SEGMENTS]); + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [TWO_HALF_SEGMENTS]); +} + +function partialRead(next) +{ + var t = new CopyTest("partialRead", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.addToSource(HALF_SEGMENT); + t.makeSourceReadable(HALF_SEGMENT.length); + t.makeSinkWritableAndWaitFor(SEGMENT.length, [SEGMENT]); + t.closeSourceAndWaitFor(Cr.NS_OK, HALF_SEGMENT.length, [HALF_SEGMENT]); + t.expect(Cr.NS_OK, [SEGMENT, HALF_SEGMENT]); +} + +function partialWrite(next) +{ + var t = new CopyTest("partialWrite", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length, + [QUARTER_SEGMENT, + MIDDLE_HALF_SEGMENT, + LAST_QUARTER_SEGMENT]); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.makeSinkWritableByIncrementsAndWaitFor(SEGMENT.length, + [HALF_SEGMENT, SECOND_HALF_SEGMENT]); + + t.addToSource(THREE_SEGMENTS); + t.makeSourceReadable(THREE_SEGMENTS.length); + t.makeSinkWritableByIncrementsAndWaitFor(THREE_SEGMENTS.length, + [HALF_SEGMENT, SECOND_HALF_SEGMENT, + SECOND_SEGMENT, + HALF_THIRD_SEGMENT, + LATTER_HALF_THIRD_SEGMENT]); + + t.closeSource(Cr.NS_OK); + t.expect(Cr.NS_OK, [SEGMENT, SEGMENT, THREE_SEGMENTS]); +} + +function sinkClosedImmediately(next) +{ + var t = new CopyTest("sinkClosedImmediately", next); + + t.closeSink(Cr.NS_OK); + t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]); +} + +function sinkClosedWithReadableData(next) +{ + var t = new CopyTest("sinkClosedWithReadableData", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + t.closeSink(Cr.NS_OK); + t.expect(Cr.NS_ERROR_UNEXPECTED, [NOTHING]); +} + +function sinkClosedAfterWrite(next) +{ + var t = new CopyTest("sinkClosedAfterWrite", next); + + t.addToSource(TWO_SEGMENTS); + t.makeSourceReadable(TWO_SEGMENTS.length); + t.makeSinkWritableAndWaitFor(FIRST_SEGMENT.length, [FIRST_SEGMENT]); + t.closeSink(Cr.NS_OK); + t.expect(Cr.NS_ERROR_UNEXPECTED, [FIRST_SEGMENT]); +} + +function sourceAndSinkClosed(next) +{ + var t = new CopyTest("sourceAndSinkClosed", next); + + t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK); + t.expect(Cr.NS_OK, []); +} + +function sinkAndSourceClosed(next) +{ + var t = new CopyTest("sinkAndSourceClosed", next); + + t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK); + + // sink notify received first, hence error + t.expect(Cr.NS_ERROR_UNEXPECTED, []); +} + +function sourceAndSinkClosedWithPendingData(next) +{ + var t = new CopyTest("sourceAndSinkClosedWithPendingData", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + + t.closeSourceThenSink(Cr.NS_OK, Cr.NS_OK); + + // not all data from source copied, so error + t.expect(Cr.NS_ERROR_UNEXPECTED, []); +} + +function sinkAndSourceClosedWithPendingData(next) +{ + var t = new CopyTest("sinkAndSourceClosedWithPendingData", next); + + t.addToSource(SEGMENT); + t.makeSourceReadable(SEGMENT.length); + + t.closeSinkThenSource(Cr.NS_OK, Cr.NS_OK); + + // not all data from source copied, plus sink notify received first, so error + t.expect(Cr.NS_ERROR_UNEXPECTED, []); +} + + +/************* + * UTILITIES * + *************/ + +/** Returns the sum of the elements in arr. */ +function sum(arr) +{ + var sum = 0; + for (var i = 0, sz = arr.length; i < sz; i++) + sum += arr[i]; + return sum; +} + +/** + * Returns a constructor for an input or output stream callback that will wrap + * the one provided to it as an argument. + * + * @param wrapperCallback : (nsIInputStreamCallback | nsIOutputStreamCallback) : void + * the original callback object (not a function!) being wrapped + * @param name : string + * either "onInputStreamReady" if we're wrapping an input stream callback or + * "onOutputStreamReady" if we're wrapping an output stream callback + * @returns function(nsIInputStreamCallback | nsIOutputStreamCallback) : (nsIInputStreamCallback | nsIOutputStreamCallback) + * a constructor function which constructs a callback object (not function!) + * which, when called, first calls the original callback provided to it and + * then calls wrapperCallback + */ +function createStreamReadyInterceptor(wrapperCallback, name) +{ + return function StreamReadyInterceptor(callback) + { + this.wrappedCallback = callback; + this[name] = function streamReadyInterceptor(stream) + { + dumpn("*** StreamReadyInterceptor." + name); + + try + { + dumpn("*** calling original " + name + "..."); + callback[name](stream); + } + catch (e) + { + dumpn("!!! error running inner callback: " + e); + throw e; + } + finally + { + dumpn("*** calling wrapper " + name + "..."); + wrapperCallback[name](stream); + } + } + }; +} + +/** + * Print out a banner with the given message, uppercased, for debugging + * purposes. + */ +function note(m) +{ + m = m.toUpperCase(); + var asterisks = Array(m.length + 1 + 4).join("*"); + dumpn(asterisks + "\n* " + m + " *\n" + asterisks); +} + + +/*********** + * MOCKERY * + ***********/ + +/* + * Blatantly violate abstractions in the name of testability. THIS IS NOT + * PUBLIC API! If you use any of these I will knowingly break your code by + * changing the names of variables and properties. + */ +var BinaryInputStream = function BIS(stream) { return stream; }; +var BinaryOutputStream = function BOS(stream) { return stream; }; +Response.SEGMENT_SIZE = SEGMENT.length; + +/** + * Roughly mocks an nsIPipe, presenting non-blocking input and output streams + * that appear to also be binary streams and whose readability and writability + * amounts are configurable. Only the methods used in this test have been + * implemented -- these aren't exact mocks (can't be, actually, because input + * streams have unscriptable methods). + * + * @param name : string + * a name for this pipe, used in debugging output + */ +function CustomPipe(name) +{ + var self = this; + + /** Data read from input that's buffered until it can be written to output. */ + this._data = []; + + /** + * The status of this pipe, which is to say the error result the ends of this + * pipe will return when attempts are made to use them. This value is always + * an error result when copying has finished, because success codes are + * converted to NS_BASE_STREAM_CLOSED. + */ + this._status = Cr.NS_OK; + + /** The input end of this pipe. */ + var input = this.inputStream = + { + /** A name for this stream, used in debugging output. */ + name: name + " input", + + /** + * The number of bytes of data available to be read from this pipe, or + * Infinity if any amount of data in this pipe is made readable as soon as + * it is written to the pipe output. + */ + _readable: 0, + + /** + * Data regarding a pending stream-ready callback on this, or null if no + * callback is currently waiting to be called. + */ + _waiter: null, + + /** + * The event currently dispatched to make a stream-ready callback, if any + * such callback is currently ready to be made and not already in + * progress, or null when no callback is waiting to happen. + */ + _event: null, + + /** + * A stream-ready constructor to wrap an existing callback to intercept + * stream-ready notifications, or null if notifications shouldn't be + * wrapped at all. + */ + _streamReadyInterceptCreator: null, + + /** + * Registers a stream-ready wrapper creator function so that a + * stream-ready callback made in the future can be wrapped. + */ + interceptStreamReadyCallbacks: function(streamReadyInterceptCreator) + { + dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks"); + + do_check_true(this._streamReadyInterceptCreator === null, + "intercepting twice"); + this._streamReadyInterceptCreator = streamReadyInterceptCreator; + if (this._waiter) + { + this._waiter.callback = + new streamReadyInterceptCreator(this._waiter.callback); + } + }, + + /** + * Removes a previously-registered stream-ready wrapper creator function, + * also clearing any current wrapping. + */ + removeStreamReadyInterceptor: function() + { + dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()"); + + do_check_true(this._streamReadyInterceptCreator !== null, + "removing interceptor when none present?"); + this._streamReadyInterceptCreator = null; + if (this._waiter) + this._waiter.callback = this._waiter.callback.wrappedCallback; + }, + + // + // see nsIAsyncInputStream.asyncWait + // + asyncWait: function asyncWait(callback, flags, requestedCount, target) + { + dumpn("*** [" + this.name + "].asyncWait"); + + do_check_true(callback && typeof callback !== "function"); + + var closureOnly = + (flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0; + + do_check_true(this._waiter === null || + (this._waiter.closureOnly && !closureOnly), + "asyncWait already called with a non-closure-only " + + "callback? unexpected!"); + + this._waiter = + { + callback: + this._streamReadyInterceptCreator + ? new this._streamReadyInterceptCreator(callback) + : callback, + closureOnly: closureOnly, + requestedCount: requestedCount, + eventTarget: target + }; + + if (!Components.isSuccessCode(self._status) || + (!closureOnly && this._readable >= requestedCount && + self._data.length >= requestedCount)) + { + this._notify(); + } + }, + + // + // see nsIAsyncInputStream.closeWithStatus + // + closeWithStatus: function closeWithStatus(status) + { + dumpn("*** [" + this.name + "].closeWithStatus" + + "(" + status + ")"); + + if (!Components.isSuccessCode(self._status)) + { + dumpn("*** ignoring second closure of [input " + this.name + "] " + + "(status " + self._status + ")"); + return; + } + + if (Components.isSuccessCode(status)) + status = Cr.NS_BASE_STREAM_CLOSED; + + self._status = status; + + if (this._waiter) + this._notify(); + if (output._waiter) + output._notify(); + }, + + // + // see nsIBinaryInputStream.readByteArray + // + readByteArray: function readByteArray(count) + { + dumpn("*** [" + this.name + "].readByteArray(" + count + ")"); + + if (self._data.length === 0) + { + throw Components.isSuccessCode(self._status) + ? Cr.NS_BASE_STREAM_WOULD_BLOCK + : self._status; + } + + do_check_true(this._readable <= self._data.length || + this._readable === Infinity, + "consistency check"); + + if (this._readable < count || self._data.length < count) + throw Cr.NS_BASE_STREAM_WOULD_BLOCK; + this._readable -= count; + return self._data.splice(0, count); + }, + + /** + * Makes the given number of additional bytes of data previously written + * to the pipe's output stream available for reading, triggering future + * notifications when required. + * + * @param count : uint + * the number of bytes of additional data to make available; must not be + * greater than the number of bytes already buffered but not made + * available by previous makeReadable calls + */ + makeReadable: function makeReadable(count) + { + dumpn("*** [" + this.name + "].makeReadable(" + count + ")"); + + do_check_true(Components.isSuccessCode(self._status), "errant call"); + do_check_true(this._readable + count <= self._data.length || + this._readable === Infinity, + "increasing readable beyond written amount"); + + this._readable += count; + + dumpn("readable: " + this._readable + ", data: " + self._data); + + var waiter = this._waiter; + if (waiter !== null) + { + if (waiter.requestedCount <= this._readable && !waiter.closureOnly) + this._notify(); + } + }, + + /** + * Disables the readability limit on this stream, meaning that as soon as + * *any* amount of data is written to output it becomes available from + * this stream and a stream-ready event is dispatched (if any stream-ready + * callback is currently set). + */ + disableReadabilityLimit: function disableReadabilityLimit() + { + dumpn("*** [" + this.name + "].disableReadabilityLimit()"); + + this._readable = Infinity; + }, + + // + // see nsIInputStream.available + // + available: function available() + { + dumpn("*** [" + this.name + "].available()"); + + if (self._data.length === 0 && !Components.isSuccessCode(self._status)) + throw self._status; + + return Math.min(this._readable, self._data.length); + }, + + /** + * Dispatches a pending stream-ready event ahead of schedule, rather than + * waiting for it to be dispatched in response to normal writes. This is + * useful when writing to the output has completed, and we need to have + * read all data written to this stream. If the output isn't closed and + * the reading of data from this races ahead of the last write to output, + * we need a notification to know when everything that's been written has + * been read. This ordinarily might be supplied by closing output, but + * in some cases it's not desirable to close output, so this supplies an + * alternative method to get notified when the last write has occurred. + */ + maybeNotifyFinally: function maybeNotifyFinally() + { + dumpn("*** [" + this.name + "].maybeNotifyFinally()"); + + do_check_true(this._waiter !== null, "must be waiting now"); + + if (self._data.length > 0) + { + dumpn("*** data still pending, normal notifications will signal " + + "completion"); + return; + } + + // No data waiting to be written, so notify. We could just close the + // stream, but that's less faithful to the server's behavior (it doesn't + // close the stream, and we're pretending to impersonate the server as + // much as we can here), so instead we're going to notify when no data + // can be read. The CopyTest has already been flagged as complete, so + // the stream listener will detect that this is a wrap-it-up notify and + // invoke the next test. + this._notify(); + }, + + /** + * Dispatches an event to call a previously-registered stream-ready + * callback. + */ + _notify: function _notify() + { + dumpn("*** [" + this.name + "]._notify()"); + + var waiter = this._waiter; + do_check_true(waiter !== null, "no waiter?"); + + if (this._event === null) + { + var event = this._event = + { + run: function run() + { + input._waiter = null; + input._event = null; + try + { + do_check_true(!Components.isSuccessCode(self._status) || + input._readable >= waiter.requestedCount); + waiter.callback.onInputStreamReady(input); + } + catch (e) + { + do_throw("error calling onInputStreamReady: " + e); + } + } + }; + waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); + } + }, + + QueryInterface: function QueryInterface(iid) + { + if (iid.equals(Ci.nsIAsyncInputStream) || + iid.equals(Ci.nsIInputStream) || + iid.equals(Ci.nsISupports)) + { + return this; + } + + throw Cr.NS_ERROR_NO_INTERFACE; + } + }; + + /** The output end of this pipe. */ + var output = this.outputStream = + { + /** A name for this stream, used in debugging output. */ + name: name + " output", + + /** + * The number of bytes of data which may be written to this pipe without + * blocking. + */ + _writable: 0, + + /** + * The increments in which pending data should be written, rather than + * simply defaulting to the amount requested (which, given that + * input.asyncWait precisely respects the requestedCount argument, will + * ordinarily always be writable in that amount), as an array whose + * elements from start to finish are the number of bytes to write each + * time write() or writeByteArray() is subsequently called. The sum of + * the values in this array, if this array is not empty, is always equal + * to this._writable. + */ + _writableAmounts: [], + + /** + * Data regarding a pending stream-ready callback on this, or null if no + * callback is currently waiting to be called. + */ + _waiter: null, + + /** + * The event currently dispatched to make a stream-ready callback, if any + * such callback is currently ready to be made and not already in + * progress, or null when no callback is waiting to happen. + */ + _event: null, + + /** + * A stream-ready constructor to wrap an existing callback to intercept + * stream-ready notifications, or null if notifications shouldn't be + * wrapped at all. + */ + _streamReadyInterceptCreator: null, + + /** + * Registers a stream-ready wrapper creator function so that a + * stream-ready callback made in the future can be wrapped. + */ + interceptStreamReadyCallbacks: function(streamReadyInterceptCreator) + { + dumpn("*** [" + this.name + "].interceptStreamReadyCallbacks"); + + do_check_true(this._streamReadyInterceptCreator !== null, + "intercepting onOutputStreamReady twice"); + this._streamReadyInterceptCreator = streamReadyInterceptCreator; + if (this._waiter) + { + this._waiter.callback = + new streamReadyInterceptCreator(this._waiter.callback); + } + }, + + /** + * Removes a previously-registered stream-ready wrapper creator function, + * also clearing any current wrapping. + */ + removeStreamReadyInterceptor: function() + { + dumpn("*** [" + this.name + "].removeStreamReadyInterceptor()"); + + do_check_true(this._streamReadyInterceptCreator !== null, + "removing interceptor when none present?"); + this._streamReadyInterceptCreator = null; + if (this._waiter) + this._waiter.callback = this._waiter.callback.wrappedCallback; + }, + + // + // see nsIAsyncOutputStream.asyncWait + // + asyncWait: function asyncWait(callback, flags, requestedCount, target) + { + dumpn("*** [" + this.name + "].asyncWait"); + + do_check_true(callback && typeof callback !== "function"); + + var closureOnly = + (flags & Ci.nsIAsyncInputStream.WAIT_CLOSURE_ONLY) !== 0; + + do_check_true(this._waiter === null || + (this._waiter.closureOnly && !closureOnly), + "asyncWait already called with a non-closure-only " + + "callback? unexpected!"); + + this._waiter = + { + callback: + this._streamReadyInterceptCreator + ? new this._streamReadyInterceptCreator(callback) + : callback, + closureOnly: closureOnly, + requestedCount: requestedCount, + eventTarget: target, + toString: function toString() + { + return "waiter(" + (closureOnly ? "closure only, " : "") + + "requestedCount: " + requestedCount + ", target: " + + target + ")"; + } + }; + + if ((!closureOnly && this._writable >= requestedCount) || + !Components.isSuccessCode(this.status)) + { + this._notify(); + } + }, + + // + // see nsIAsyncOutputStream.closeWithStatus + // + closeWithStatus: function closeWithStatus(status) + { + dumpn("*** [" + this.name + "].closeWithStatus(" + status + ")"); + + if (!Components.isSuccessCode(self._status)) + { + dumpn("*** ignoring redundant closure of [input " + this.name + "] " + + "because it's already closed (status " + self._status + ")"); + return; + } + + if (Components.isSuccessCode(status)) + status = Cr.NS_BASE_STREAM_CLOSED; + + self._status = status; + + if (input._waiter) + input._notify(); + if (this._waiter) + this._notify(); + }, + + // + // see nsIBinaryOutputStream.writeByteArray + // + writeByteArray: function writeByteArray(bytes, length) + { + dumpn("*** [" + this.name + "].writeByteArray" + + "([" + bytes + "], " + length + ")"); + + do_check_eq(bytes.length, length, "sanity"); + if (!Components.isSuccessCode(self._status)) + throw self._status; + + do_check_eq(this._writableAmounts.length, 0, + "writeByteArray can't support specified-length writes"); + + if (this._writable < length) + throw Cr.NS_BASE_STREAM_WOULD_BLOCK; + + self._data.push.apply(self._data, bytes); + this._writable -= length; + + if (input._readable === Infinity && input._waiter && + !input._waiter.closureOnly) + { + input._notify(); + } + }, + + // + // see nsIOutputStream.write + // + write: function write(str, length) + { + dumpn("*** [" + this.name + "].write"); + + do_check_eq(str.length, length, "sanity"); + if (!Components.isSuccessCode(self._status)) + throw self._status; + if (this._writable === 0) + throw Cr.NS_BASE_STREAM_WOULD_BLOCK; + + var actualWritten; + if (this._writableAmounts.length === 0) + { + actualWritten = Math.min(this._writable, length); + } + else + { + do_check_true(this._writable >= this._writableAmounts[0], + "writable amounts value greater than writable data?"); + do_check_eq(this._writable, sum(this._writableAmounts), + "total writable amount not equal to sum of writable " + + "increments"); + actualWritten = this._writableAmounts.shift(); + } + + var bytes = str.substring(0, actualWritten) + .split("") + .map(function(v) { return v.charCodeAt(0); }); + + self._data.push.apply(self._data, bytes); + this._writable -= actualWritten; + + if (input._readable === Infinity && input._waiter && + !input._waiter.closureOnly) + { + input._notify(); + } + + return actualWritten; + }, + + /** + * Increase the amount of data that can be written without blocking by the + * given number of bytes, triggering future notifications when required. + * + * @param count : uint + * the number of bytes of additional data to make writable + */ + makeWritable: function makeWritable(count) + { + dumpn("*** [" + this.name + "].makeWritable(" + count + ")"); + + do_check_true(Components.isSuccessCode(self._status)); + + this._writable += count; + + var waiter = this._waiter; + if (waiter && !waiter.closureOnly && + waiter.requestedCount <= this._writable) + { + this._notify(); + } + }, + + /** + * Increase the amount of data that can be written without blocking, but + * do so by specifying a number of bytes that will be written each time + * a write occurs, even as asyncWait notifications are initially triggered + * as usual. Thus, rather than writes eagerly writing everything possible + * at each step, attempts to write out data by segment devolve into a + * partial segment write, then another, and so on until the amount of data + * specified as permitted to be written, has been written. + * + * Note that the writeByteArray method is incompatible with the previous + * calling of this method, in that, until all increments provided to this + * method have been consumed, writeByteArray cannot be called. Once all + * increments have been consumed, writeByteArray may again be called. + * + * @param increments : [uint] + * an array whose elements are positive numbers of bytes to permit to be + * written each time write() is subsequently called on this, ignoring + * the total amount of writable space specified by the sum of all + * increments + */ + makeWritableByIncrements: function makeWritableByIncrements(increments) + { + dumpn("*** [" + this.name + "].makeWritableByIncrements" + + "([" + increments.join(", ") + "])"); + + do_check_true(increments.length > 0, "bad increments"); + do_check_true(increments.every(function(v) { return v > 0; }), + "zero increment?"); + + do_check_true(Components.isSuccessCode(self._status)); + + this._writable += sum(increments); + this._writableAmounts = increments; + + var waiter = this._waiter; + if (waiter && !waiter.closureOnly && + waiter.requestedCount <= this._writable) + { + this._notify(); + } + }, + + /** + * Dispatches an event to call a previously-registered stream-ready + * callback. + */ + _notify: function _notify() + { + dumpn("*** [" + this.name + "]._notify()"); + + var waiter = this._waiter; + do_check_true(waiter !== null, "no waiter?"); + + if (this._event === null) + { + var event = this._event = + { + run: function run() + { + output._waiter = null; + output._event = null; + + try + { + waiter.callback.onOutputStreamReady(output); + } + catch (e) + { + do_throw("error calling onOutputStreamReady: " + e); + } + } + }; + waiter.eventTarget.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); + } + }, + + QueryInterface: function QueryInterface(iid) + { + if (iid.equals(Ci.nsIAsyncOutputStream) || + iid.equals(Ci.nsIOutputStream) || + iid.equals(Ci.nsISupports)) + { + return this; + } + + throw Cr.NS_ERROR_NO_INTERFACE; + } + }; +} + +/** + * Represents a sequence of interactions to perform with a copier, in a given + * order and at the desired time intervals. + * + * @param name : string + * test name, used in debugging output + */ +function CopyTest(name, next) +{ + /** Name used in debugging output. */ + this.name = name; + + /** A function called when the test completes. */ + this._done = next; + + var sourcePipe = new CustomPipe(name + "-source"); + + /** The source of data for the copier to copy. */ + this._source = sourcePipe.inputStream; + + /** + * The sink to which to write data which will appear in the copier's source. + */ + this._copyableDataStream = sourcePipe.outputStream; + + var sinkPipe = new CustomPipe(name + "-sink"); + + /** The sink to which the copier copies data. */ + this._sink = sinkPipe.outputStream; + + /** Input stream from which to read data the copier's written to its sink. */ + this._copiedDataStream = sinkPipe.inputStream; + + this._copiedDataStream.disableReadabilityLimit(); + + /** + * True if there's a callback waiting to read data written by the copier to + * its output, from the input end of the pipe representing the copier's sink. + */ + this._waitingForData = false; + + /** + * An array of the bytes of data expected to be written to output by the + * copier when this test runs. + */ + this._expectedData = undefined; + + /** Array of bytes of data received so far. */ + this._receivedData = []; + + /** The expected final status returned by the copier. */ + this._expectedStatus = -1; + + /** The actual final status returned by the copier. */ + this._actualStatus = -1; + + /** The most recent sequence of bytes written to output by the copier. */ + this._lastQuantum = []; + + /** + * True iff we've received the last quantum of data written to the sink by the + * copier. + */ + this._allDataWritten = false; + + /** + * True iff the copier has notified its associated stream listener of + * completion. + */ + this._copyingFinished = false; + + /** Index of the next task to execute while driving the copier. */ + this._currentTask = 0; + + /** Array containing all tasks to run. */ + this._tasks = []; + + /** The copier used by this test. */ + this._copier = + new WriteThroughCopier(this._source, this._sink, this, null); + + // Start watching for data written by the copier to the sink. + this._waitForWrittenData(); +} +CopyTest.prototype = +{ + /** + * Adds the given array of bytes to data in the copier's source. + * + * @param bytes : [uint] + * array of bytes of data to add to the source for the copier + */ + addToSource: function addToSource(bytes) + { + var self = this; + this._addToTasks(function addToSourceTask() + { + note("addToSourceTask"); + + try + { + self._copyableDataStream.makeWritable(bytes.length); + self._copyableDataStream.writeByteArray(bytes, bytes.length); + } + finally + { + self._stageNextTask(); + } + }); + }, + + /** + * Makes bytes of data previously added to the source available to be read by + * the copier. + * + * @param count : uint + * number of bytes to make available for reading + */ + makeSourceReadable: function makeSourceReadable(count) + { + var self = this; + this._addToTasks(function makeSourceReadableTask() + { + note("makeSourceReadableTask"); + + self._source.makeReadable(count); + self._stageNextTask(); + }); + }, + + /** + * Increases available space in the sink by the given amount, waits for the + * given series of arrays of bytes to be written to sink by the copier, and + * causes execution to asynchronously continue to the next task when the last + * of those arrays of bytes is received. + * + * @param bytes : uint + * number of bytes of space to make available in the sink + * @param dataQuantums : [[uint]] + * array of byte arrays to expect to be written in sequence to the sink + */ + makeSinkWritableAndWaitFor: + function makeSinkWritableAndWaitFor(bytes, dataQuantums) + { + var self = this; + + do_check_eq(bytes, + dataQuantums.reduce(function(partial, current) + { + return partial + current.length; + }, 0), + "bytes/quantums mismatch"); + + function increaseSinkSpaceTask() + { + /* Now do the actual work to trigger the interceptor. */ + self._sink.makeWritable(bytes); + } + + this._waitForHelper("increaseSinkSpaceTask", + dataQuantums, increaseSinkSpaceTask); + }, + + /** + * Increases available space in the sink by the given amount, waits for the + * given series of arrays of bytes to be written to sink by the copier, and + * causes execution to asynchronously continue to the next task when the last + * of those arrays of bytes is received. + * + * @param bytes : uint + * number of bytes of space to make available in the sink + * @param dataQuantums : [[uint]] + * array of byte arrays to expect to be written in sequence to the sink + */ + makeSinkWritableByIncrementsAndWaitFor: + function makeSinkWritableByIncrementsAndWaitFor(bytes, dataQuantums) + { + var self = this; + + var desiredAmounts = dataQuantums.map(function(v) { return v.length; }); + do_check_eq(bytes, sum(desiredAmounts), "bytes/quantums mismatch"); + + function increaseSinkSpaceByIncrementsTask() + { + /* Now do the actual work to trigger the interceptor incrementally. */ + self._sink.makeWritableByIncrements(desiredAmounts); + } + + this._waitForHelper("increaseSinkSpaceByIncrementsTask", + dataQuantums, increaseSinkSpaceByIncrementsTask); + }, + + /** + * Close the copier's source stream, then asynchronously continue to the next + * task. + * + * @param status : nsresult + * the status to provide when closing the copier's source stream + */ + closeSource: function closeSource(status) + { + var self = this; + + this._addToTasks(function closeSourceTask() + { + note("closeSourceTask"); + + self._source.closeWithStatus(status); + self._stageNextTask(); + }); + }, + + /** + * Close the copier's source stream, then wait for the given number of bytes + * and for the given series of arrays of bytes to be written to the sink, then + * asynchronously continue to the next task. + * + * @param status : nsresult + * the status to provide when closing the copier's source stream + * @param bytes : uint + * number of bytes of space to make available in the sink + * @param dataQuantums : [[uint]] + * array of byte arrays to expect to be written in sequence to the sink + */ + closeSourceAndWaitFor: + function closeSourceAndWaitFor(status, bytes, dataQuantums) + { + var self = this; + + do_check_eq(bytes, sum(dataQuantums.map(function(v) { return v.length; })), + "bytes/quantums mismatch"); + + function closeSourceAndWaitForTask() + { + self._sink.makeWritable(bytes); + self._copyableDataStream.closeWithStatus(status); + } + + this._waitForHelper("closeSourceAndWaitForTask", + dataQuantums, closeSourceAndWaitForTask); + }, + + /** + * Closes the copier's sink stream, providing the given status, then + * asynchronously continue to the next task. + * + * @param status : nsresult + * the status to provide when closing the copier's sink stream + */ + closeSink: function closeSink(status) + { + var self = this; + this._addToTasks(function closeSinkTask() + { + note("closeSinkTask"); + + self._sink.closeWithStatus(status); + self._stageNextTask(); + }); + }, + + /** + * Closes the copier's source stream, then immediately closes the copier's + * sink stream, then asynchronously continues to the next task. + * + * @param sourceStatus : nsresult + * the status to provide when closing the copier's source stream + * @param sinkStatus : nsresult + * the status to provide when closing the copier's sink stream + */ + closeSourceThenSink: function closeSourceThenSink(sourceStatus, sinkStatus) + { + var self = this; + this._addToTasks(function closeSourceThenSinkTask() + { + note("closeSourceThenSinkTask"); + + self._source.closeWithStatus(sourceStatus); + self._sink.closeWithStatus(sinkStatus); + self._stageNextTask(); + }); + }, + + /** + * Closes the copier's sink stream, then immediately closes the copier's + * source stream, then asynchronously continues to the next task. + * + * @param sinkStatus : nsresult + * the status to provide when closing the copier's sink stream + * @param sourceStatus : nsresult + * the status to provide when closing the copier's source stream + */ + closeSinkThenSource: function closeSinkThenSource(sinkStatus, sourceStatus) + { + var self = this; + this._addToTasks(function closeSinkThenSourceTask() + { + note("closeSinkThenSource"); + + self._sink.closeWithStatus(sinkStatus); + self._source.closeWithStatus(sourceStatus); + self._stageNextTask(); + }); + }, + + /** + * Indicates that the given status is expected to be returned when the stream + * listener for the copy indicates completion, that the expected data copied + * by the copier to sink are the concatenation of the arrays of bytes in + * receivedData, and kicks off the tasks in this test. + * + * @param expectedStatus : nsresult + * the status expected to be returned by the copier at completion + * @param receivedData : [[uint]] + * an array containing arrays of bytes whose concatenation constitutes the + * expected copied data + */ + expect: function expect(expectedStatus, receivedData) + { + this._expectedStatus = expectedStatus; + this._expectedData = []; + for (var i = 0, sz = receivedData.length; i < sz; i++) + this._expectedData.push.apply(this._expectedData, receivedData[i]); + + this._stageNextTask(); + }, + + /** + * Sets up a stream interceptor that will verify that each piece of data + * written to the sink by the copier corresponds to the currently expected + * pieces of data, calls the trigger, then waits for those pieces of data to + * be received. Once all have been received, the interceptor is removed and + * the next task is asynchronously executed. + * + * @param name : string + * name of the task created by this, used in debugging output + * @param dataQuantums : [[uint]] + * array of expected arrays of bytes to be written to the sink by the copier + * @param trigger : function() : void + * function to call after setting up the interceptor to wait for + * notifications (which will be generated as a result of this function's + * actions) + */ + _waitForHelper: function _waitForHelper(name, dataQuantums, trigger) + { + var self = this; + this._addToTasks(function waitForHelperTask() + { + note(name); + + var quantumIndex = 0; + + /* + * Intercept all data-available notifications so we can continue when all + * the ones we expect have been received. + */ + var streamReadyCallback = + { + onInputStreamReady: function wrapperOnInputStreamReady(input) + { + dumpn("*** streamReadyCallback.onInputStreamReady" + + "(" + input.name + ")"); + + do_check_eq(this, streamReadyCallback, "sanity"); + + try + { + if (quantumIndex < dataQuantums.length) + { + var quantum = dataQuantums[quantumIndex++]; + var sz = quantum.length; + do_check_eq(self._lastQuantum.length, sz, + "different quantum lengths"); + for (var i = 0; i < sz; i++) + { + do_check_eq(self._lastQuantum[i], quantum[i], + "bad data at " + i); + } + + dumpn("*** waiting to check remaining " + + (dataQuantums.length - quantumIndex) + " quantums..."); + } + } + finally + { + if (quantumIndex === dataQuantums.length) + { + dumpn("*** data checks completed! next task..."); + self._copiedDataStream.removeStreamReadyInterceptor(); + self._stageNextTask(); + } + } + } + }; + + var interceptor = + createStreamReadyInterceptor(streamReadyCallback, "onInputStreamReady"); + self._copiedDataStream.interceptStreamReadyCallbacks(interceptor); + + /* Do the deed. */ + trigger(); + }); + }, + + /** + * Initiates asynchronous waiting for data written to the copier's sink to be + * available for reading from the input end of the sink's pipe. The callback + * stores the received data for comparison in the interceptor used in the + * callback added by _waitForHelper and signals test completion when it + * receives a zero-data-available notification (if the copier has notified + * that it is finished; otherwise allows execution to continue until that has + * occurred). + */ + _waitForWrittenData: function _waitForWrittenData() + { + dumpn("*** _waitForWrittenData (" + this.name + ")"); + + var self = this; + var outputWrittenWatcher = + { + onInputStreamReady: function onInputStreamReady(input) + { + dumpn("*** outputWrittenWatcher.onInputStreamReady" + + "(" + input.name + ")"); + + if (self._allDataWritten) + { + do_throw("ruh-roh! why are we getting notified of more data " + + "after we should have received all of it?"); + } + + self._waitingForData = false; + + try + { + var avail = input.available(); + } + catch (e) + { + dumpn("*** available() threw! error: " + e); + if (self._completed) + { + dumpn("*** NB: this isn't a problem, because we've copied " + + "completely now, and this notify may have been expedited " + + "by maybeNotifyFinally such that we're being called when " + + "we can *guarantee* nothing is available any more"); + } + avail = 0; + } + + if (avail > 0) + { + var data = input.readByteArray(avail); + do_check_eq(data.length, avail, + "readByteArray returned wrong number of bytes?"); + self._lastQuantum = data; + self._receivedData.push.apply(self._receivedData, data); + } + + if (avail === 0) + { + dumpn("*** all data received!"); + + self._allDataWritten = true; + + if (self._copyingFinished) + { + dumpn("*** copying already finished, continuing to next test"); + self._testComplete(); + } + else + { + dumpn("*** copying not finished, waiting for that to happen"); + } + + return; + } + + self._waitForWrittenData(); + } + }; + + this._copiedDataStream.asyncWait(outputWrittenWatcher, 0, 1, + gThreadManager.currentThread); + this._waitingForData = true; + }, + + /** + * Indicates this test is complete, does the final data-received and copy + * status comparisons, and calls the test-completion function provided when + * this test was first created. + */ + _testComplete: function _testComplete() + { + dumpn("*** CopyTest(" + this.name + ") complete! " + + "On to the next test..."); + + try + { + do_check_true(this._allDataWritten, "expect all data written now!"); + do_check_true(this._copyingFinished, "expect copying finished now!"); + + do_check_eq(this._actualStatus, this._expectedStatus, + "wrong final status"); + + var expected = this._expectedData, received = this._receivedData; + dumpn("received: [" + received + "], expected: [" + expected + "]"); + do_check_eq(received.length, expected.length, "wrong data"); + for (var i = 0, sz = expected.length; i < sz; i++) + do_check_eq(received[i], expected[i], "bad data at " + i); + } + catch (e) + { + dumpn("!!! ERROR PERFORMING FINAL " + this.name + " CHECKS! " + e); + throw e; + } + finally + { + dumpn("*** CopyTest(" + this.name + ") complete! " + + "Invoking test-completion callback..."); + this._done(); + } + }, + + /** Dispatches an event at this thread which will run the next task. */ + _stageNextTask: function _stageNextTask() + { + dumpn("*** CopyTest(" + this.name + ")._stageNextTask()"); + + if (this._currentTask === this._tasks.length) + { + dumpn("*** CopyTest(" + this.name + ") tasks complete!"); + return; + } + + var task = this._tasks[this._currentTask++]; + var self = this; + var event = + { + run: function run() + { + try + { + task(); + } + catch (e) + { + do_throw("exception thrown running task: " + e); + } + } + }; + gThreadManager.currentThread.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL); + }, + + /** + * Adds the given function as a task to be run at a later time. + * + * @param task : function() : void + * the function to call as a task + */ + _addToTasks: function _addToTasks(task) + { + this._tasks.push(task); + }, + + // + // see nsIRequestObserver.onStartRequest + // + onStartRequest: function onStartRequest(self, _) + { + dumpn("*** CopyTest.onStartRequest (" + self.name + ")"); + + do_check_true(_ === null); + do_check_eq(this._receivedData.length, 0); + do_check_eq(this._lastQuantum.length, 0); + }, + + // + // see nsIRequestObserver.onStopRequest + // + onStopRequest: function onStopRequest(self, _, status) + { + dumpn("*** CopyTest.onStopRequest (" + self.name + ", " + status + ")"); + + do_check_true(_ === null); + this._actualStatus = status; + + this._copyingFinished = true; + + if (this._allDataWritten) + { + dumpn("*** all data written, continuing with remaining tests..."); + this._testComplete(); + } + else + { + /* + * Everything's copied as far as the copier is concerned. However, there + * may be a backup transferring from the output end of the copy sink to + * the input end where we can actually verify that the expected data was + * written as expected, because that transfer occurs asynchronously. If + * we do final data-received checks now, we'll miss still-pending data. + * Therefore, to wrap up this copy test we still need to asynchronously + * wait on the input end of the sink until we hit end-of-stream or some + * error condition. Then we know we're done and can continue with the + * next test. + */ + dumpn("*** not all data copied, waiting for that to happen..."); + + if (!this._waitingForData) + this._waitForWrittenData(); + + this._copiedDataStream.maybeNotifyFinally(); + } + } +}; diff --git a/netwerk/test/httpserver/test/test_basic_functionality.js b/netwerk/test/httpserver/test/test_basic_functionality.js index 30d997b47507..2c2ed04f7e70 100644 --- a/netwerk/test/httpserver/test/test_basic_functionality.js +++ b/netwerk/test/httpserver/test/test_basic_functionality.js @@ -48,6 +48,8 @@ var tests = null, start_functionHandler, null), new Test("http://localhost:4444/non-existent-path", null, start_non_existent_path, null), + new Test("http://localhost:4444/lotsOfHeaders", + null, start_lots_of_headers, null), ]; function run_test() @@ -64,12 +66,14 @@ function run_test() // register a few test paths srv.registerPathHandler("/objHandler", objHandler); srv.registerPathHandler("/functionHandler", functionHandler); + srv.registerPathHandler("/lotsOfHeaders", lotsOfHeadersHandler); srv.start(4444); runHttpTests(tests, testComplete(srv)); } +const HEADER_COUNT = 1000; // TEST DATA @@ -122,6 +126,16 @@ function start_non_existent_path(ch, cx) do_check_false(ch.requestSucceeded); } +function start_lots_of_headers(ch, cx) +{ + commonCheck(ch); + + do_check_eq(ch.responseStatus, 200); + do_check_true(ch.requestSucceeded); + + for (var i = 0; i < HEADER_COUNT; i++) + do_check_eq(ch.getResponseHeader("X-Header-" + i), "value " + i); +} // PATH HANDLERS @@ -175,3 +189,12 @@ function functionHandler(metadata, response) var body = "this is text\n"; response.bodyOutputStream.write(body, body.length); } + +// /lotsOfHeaders +function lotsOfHeadersHandler(request, response) +{ + response.setHeader("Content-Type", "text/plain", false); + + for (var i = 0; i < HEADER_COUNT; i++) + response.setHeader("X-Header-" + i, "value " + i, false); +} diff --git a/netwerk/test/httpserver/test/test_processasync.js b/netwerk/test/httpserver/test/test_processasync.js index 07b07d418bf1..959f35178cd4 100644 --- a/netwerk/test/httpserver/test/test_processasync.js +++ b/netwerk/test/httpserver/test/test_processasync.js @@ -271,43 +271,6 @@ test = new Test(PREPATH + "/handleAsync2", tests.push(test); -const ASYNC_ERROR_BODY = "hi, I'm a body!"; - -function handleAsyncError(request, response) -{ - response.setStatusLine(request.httpVersion, 200, "Async Error"); - response.setHeader("X-Foo", "header value", false); - - response.processAsync(); - - response.write(ASYNC_ERROR_BODY, ASYNC_ERROR_BODY.length); - - // No turning back now -- except if there's an error! - throw "Monkey wrench!"; -} -handlers["/handleAsyncError"] = handleAsyncError; - -function start_handleAsyncError(ch, cx) -{ - do_check_eq(ch.responseStatus, 200); - do_check_eq(ch.responseStatusText, "Async Error"); - do_check_eq(ch.getResponseHeader("X-Foo"), "header value"); -} - -function stop_handleAsyncError(ch, cx, status, data) -{ - // Lies! But not really! - do_check_true(ch.requestSucceeded); - - do_check_eq(data.length, ASYNC_ERROR_BODY.length); - do_check_eq(String.fromCharCode.apply(null, data), ASYNC_ERROR_BODY); -} - -test = new Test(PREPATH + "/handleAsyncError", - null, start_handleAsyncError, stop_handleAsyncError); -tests.push(test); - - /* * Tests that accessing output stream *before* calling processAsync() works * correctly, sending written data immediately as it is written, not buffering diff --git a/netwerk/test/httpserver/test/test_seizepower.js b/netwerk/test/httpserver/test/test_seizepower.js index 65319d3a09bf..434d2dedd25b 100644 --- a/netwerk/test/httpserver/test/test_seizepower.js +++ b/netwerk/test/httpserver/test/test_seizepower.js @@ -54,9 +54,6 @@ function run_test() srv.registerPathHandler("/exceptions", handleExceptions); srv.registerPathHandler("/async-seizure", handleAsyncSeizure); srv.registerPathHandler("/seize-after-async", handleSeizeAfterAsync); - srv.registerPathHandler("/thrown-exception", handleThrownException); - srv.registerPathHandler("/asap-later-write", handleASAPLaterWrite); - srv.registerPathHandler("/asap-later-finish", handleASAPLaterFinish); srv.start(PORT); @@ -165,51 +162,6 @@ function handleSeizeAfterAsync(request, response) }); } -function handleThrownException(request, response) -{ - if (request.queryString === "writeBefore") - response.write("ignore this"); - else if (request.queryString === "writeBeforeEmpty") - response.write(""); - else if (request.queryString !== "") - throw "query string FAIL"; - response.seizePower(); - response.write("preparing to throw..."); - throw "badness 10000"; -} - -function handleASAPLaterWrite(request, response) -{ - response.seizePower(); - response.write("should only "); - response.write("see this"); - - callASAPLater(function() - { - response.write("...and not this"); - callASAPLater(function() - { - response.write("...or this"); - response.finish(); - }); - }); - - throw "opening pitch of the ballgame"; -} - -function handleASAPLaterFinish(request, response) -{ - response.seizePower(); - response.write("should only see this"); - - callASAPLater(function() - { - response.finish(); - }); - - throw "out the bum!"; -} - /*************** * BEGIN TESTS * @@ -262,48 +214,3 @@ function checkSeizeAfterAsync(data) } test = new RawTest("localhost", PORT, data, checkSeizeAfterAsync), tests.push(test); - -data = "GET /thrown-exception?writeBefore HTTP/1.0\r\n" + - "\r\n"; -function checkThrownExceptionWriteBefore(data) -{ - do_check_eq(data, "preparing to throw..."); -} -test = new RawTest("localhost", PORT, data, checkThrownExceptionWriteBefore), -tests.push(test); - -data = "GET /thrown-exception?writeBeforeEmpty HTTP/1.0\r\n" + - "\r\n"; -function checkThrownExceptionWriteBeforeEmpty(data) -{ - do_check_eq(data, "preparing to throw..."); -} -test = new RawTest("localhost", PORT, data, checkThrownExceptionWriteBeforeEmpty), -tests.push(test); - -data = "GET /thrown-exception HTTP/1.0\r\n" + - "\r\n"; -function checkThrownException(data) -{ - do_check_eq(data, "preparing to throw..."); -} -test = new RawTest("localhost", PORT, data, checkThrownException), -tests.push(test); - -data = "GET /asap-later-write HTTP/1.0\r\n" + - "\r\n"; -function checkASAPLaterWrite(data) -{ - do_check_eq(data, "should only see this"); -} -test = new RawTest("localhost", PORT, data, checkASAPLaterWrite), -tests.push(test); - -data = "GET /asap-later-finish HTTP/1.0\r\n" + - "\r\n"; -function checkASAPLaterFinish(data) -{ - do_check_eq(data, "should only see this"); -} -test = new RawTest("localhost", PORT, data, checkASAPLaterFinish), -tests.push(test);