зеркало из https://github.com/mozilla/gecko-dev.git
Bug 513854 - httpd.js should write its response data asynchronously. r=sayrer
This commit is contained in:
Родитель
31fc4565a6
Коммит
23daa1061e
|
@ -1,4 +1,4 @@
|
|||
/* -*- Mode: Java; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* -*- Mode: JavaScript; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
||||
/* vim:set ts=2 sw=2 sts=2 et: */
|
||||
/* ***** BEGIN LICENSE BLOCK *****
|
||||
* Version: MPL 1.1/GPL 2.0/LGPL 2.1
|
||||
|
@ -227,12 +227,6 @@ function getRootPrefBranch()
|
|||
const ServerSocket = CC("@mozilla.org/network/server-socket;1",
|
||||
"nsIServerSocket",
|
||||
"init");
|
||||
const BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
|
||||
"nsIBinaryInputStream",
|
||||
"setInputStream");
|
||||
const BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1",
|
||||
"nsIBinaryOutputStream",
|
||||
"setOutputStream");
|
||||
const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
|
||||
"nsIScriptableInputStream",
|
||||
"init");
|
||||
|
@ -250,6 +244,13 @@ const WritablePropertyBag = CC("@mozilla.org/hash-property-bag;1",
|
|||
const SupportsString = CC("@mozilla.org/supports-string;1",
|
||||
"nsISupportsString");
|
||||
|
||||
/* These two are non-const only so a test can overwrite them. */
|
||||
var BinaryInputStream = CC("@mozilla.org/binaryinputstream;1",
|
||||
"nsIBinaryInputStream",
|
||||
"setInputStream");
|
||||
var BinaryOutputStream = CC("@mozilla.org/binaryoutputstream;1",
|
||||
"nsIBinaryOutputStream",
|
||||
"setOutputStream");
|
||||
|
||||
/**
|
||||
* Returns the RFC 822/1123 representation of a date.
|
||||
|
@ -423,7 +424,7 @@ nsHttpServer.prototype =
|
|||
{
|
||||
var input = trans.openInputStream(0, SEGMENT_SIZE, SEGMENT_COUNT)
|
||||
.QueryInterface(Ci.nsIAsyncInputStream);
|
||||
var output = trans.openOutputStream(Ci.nsITransport.OPEN_BLOCKING, 0, 0);
|
||||
var output = trans.openOutputStream(0, 0, 0);
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
|
@ -1289,7 +1290,7 @@ RequestReader.prototype =
|
|||
}
|
||||
catch (e)
|
||||
{
|
||||
if (e.result !== Cr.NS_BASE_STREAM_CLOSED)
|
||||
if (streamClosed(e))
|
||||
{
|
||||
dumpn("*** WARNING: unexpected error when reading from socket; will " +
|
||||
"be treated as if the input stream had been closed");
|
||||
|
@ -3401,8 +3402,10 @@ function Response(connection)
|
|||
this._bodyInputStream = null;
|
||||
|
||||
/**
|
||||
* The stream copier which copies data written to the body by a request
|
||||
* handler to the network.
|
||||
* A stream copier which copies data to the network. It is initially null
|
||||
* until replaced with a copier for response headers; when headers have been
|
||||
* fully sent it is replaced with a copier for the response body, remaining
|
||||
* so for the duration of response processing.
|
||||
*/
|
||||
this._asyncCopier = null;
|
||||
|
||||
|
@ -3440,7 +3443,7 @@ Response.prototype =
|
|||
|
||||
if (!this._bodyOutputStream)
|
||||
{
|
||||
var pipe = new Pipe(false, false, Response.SEGMENT_SIZE, PR_UINT32_MAX,
|
||||
var pipe = new Pipe(true, false, Response.SEGMENT_SIZE, PR_UINT32_MAX,
|
||||
null);
|
||||
this._bodyOutputStream = pipe.outputStream;
|
||||
this._bodyInputStream = pipe.inputStream;
|
||||
|
@ -3740,8 +3743,8 @@ Response.prototype =
|
|||
// the connection will be deterministically written. This makes it easier
|
||||
// to specify exact behavior, and it makes observable behavior more
|
||||
// predictable for clients. Note that the correctness of this depends on
|
||||
// callbacks in response to _waitForData in WriteThroughCopier happening
|
||||
// asynchronously with respect to the actual writing of data to
|
||||
// callbacks in response to _waitToReadData in WriteThroughCopier
|
||||
// happening asynchronously with respect to the actual writing of data to
|
||||
// bodyOutputStream, as they currently do; if they happened synchronously,
|
||||
// an event which ran before this one could write more data to the
|
||||
// response body before we get around to canceling the copier. We have
|
||||
|
@ -3781,6 +3784,37 @@ Response.prototype =
|
|||
|
||||
// PRIVATE IMPLEMENTATION
|
||||
|
||||
/**
|
||||
* Sends the status line and headers of this response if they haven't been
|
||||
* sent and initiates the process of copying data written to this response's
|
||||
* body to the network.
|
||||
*/
|
||||
_startAsyncProcessor: function()
|
||||
{
|
||||
dumpn("*** _startAsyncProcessor()");
|
||||
|
||||
// Handle cases where we're being called a second time. The former case
|
||||
// happens when this is triggered both by complete() and by processAsync(),
|
||||
// while the latter happens when processAsync() in conjunction with sent
|
||||
// data causes abort() to be called.
|
||||
if (this._asyncCopier || this._ended)
|
||||
{
|
||||
dumpn("*** ignoring second call to _startAsyncProcessor");
|
||||
return;
|
||||
}
|
||||
|
||||
// Send headers if they haven't been sent already and should be sent, then
|
||||
// asynchronously continue to send the body.
|
||||
if (this._headers && !this._powerSeized)
|
||||
{
|
||||
this._sendHeaders();
|
||||
return;
|
||||
}
|
||||
|
||||
this._headers = null;
|
||||
this._sendBody();
|
||||
},
|
||||
|
||||
/**
|
||||
* Signals that all modifications to the response status line and headers are
|
||||
* complete and then sends that data over the network to the client. Once
|
||||
|
@ -3830,7 +3864,7 @@ Response.prototype =
|
|||
dumpn("*** header post-processing completed, sending response head...");
|
||||
|
||||
// request-line
|
||||
var preamble = statusLine;
|
||||
var preambleData = [statusLine];
|
||||
|
||||
// headers
|
||||
var headEnum = headers.enumerator;
|
||||
|
@ -3841,71 +3875,82 @@ Response.prototype =
|
|||
.data;
|
||||
var values = headers.getHeaderValues(fieldName);
|
||||
for (var i = 0, sz = values.length; i < sz; i++)
|
||||
preamble += fieldName + ": " + values[i] + "\r\n";
|
||||
preambleData.push(fieldName + ": " + values[i] + "\r\n");
|
||||
}
|
||||
|
||||
// end request-line/headers
|
||||
preamble += "\r\n";
|
||||
preambleData.push("\r\n");
|
||||
|
||||
var connection = this._connection;
|
||||
try
|
||||
var preamble = preambleData.join("");
|
||||
|
||||
var responseHeadPipe = new Pipe(true, false, 0, PR_UINT32_MAX, null);
|
||||
responseHeadPipe.outputStream.write(preamble, preamble.length);
|
||||
|
||||
var response = this;
|
||||
var copyObserver =
|
||||
{
|
||||
connection.output.write(preamble, preamble.length);
|
||||
}
|
||||
catch (e)
|
||||
onStartRequest: function(request, cx)
|
||||
{
|
||||
// Connection closed already? Even if not, failure to write the response
|
||||
// means we probably will fail later anyway, so in the interests of
|
||||
// avoiding exceptions we'll (possibly) close the connection and return.
|
||||
dumpn("*** error writing headers to socket: " + e);
|
||||
dumpn("*** preamble copying started");
|
||||
},
|
||||
|
||||
onStopRequest: function(request, cx, statusCode)
|
||||
{
|
||||
dumpn("*** preamble copying complete " +
|
||||
"[status=0x" + statusCode.toString(16) + "]");
|
||||
|
||||
if (!Components.isSuccessCode(statusCode))
|
||||
{
|
||||
dumpn("!!! header copying problems: non-success statusCode, " +
|
||||
"ending response");
|
||||
|
||||
response.end();
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
response._sendBody();
|
||||
}
|
||||
},
|
||||
|
||||
QueryInterface: function(aIID)
|
||||
{
|
||||
if (aIID.equals(Ci.nsIRequestObserver) || aIID.equals(Ci.nsISupports))
|
||||
return this;
|
||||
|
||||
throw Cr.NS_ERROR_NO_INTERFACE;
|
||||
}
|
||||
};
|
||||
|
||||
var headerCopier = this._asyncCopier =
|
||||
new WriteThroughCopier(responseHeadPipe.inputStream,
|
||||
this._connection.output,
|
||||
copyObserver, null);
|
||||
|
||||
responseHeadPipe.outputStream.close();
|
||||
|
||||
// Forbid setting any more headers or modifying the request line.
|
||||
this._headers = null;
|
||||
},
|
||||
|
||||
/**
|
||||
* Sends the status line and headers of this response if they haven't been
|
||||
* sent and initiates the process of copying data written to this response's
|
||||
* body to the network.
|
||||
* Asynchronously writes the body of the response (or the entire response, if
|
||||
* seizePower() has been called) to the network.
|
||||
*/
|
||||
_startAsyncProcessor: function()
|
||||
_sendBody: function()
|
||||
{
|
||||
dumpn("*** _startAsyncProcessor()");
|
||||
dumpn("*** _sendBody");
|
||||
|
||||
// Handle cases where we're being called a second time. The former case
|
||||
// happens when this is triggered both by complete() and by processAsync(),
|
||||
// while the latter happens when processAsync() in conjunction with sent
|
||||
// data causes abort() to be called.
|
||||
if (this._asyncCopier || this._ended)
|
||||
{
|
||||
dumpn("*** ignoring second call to _startAsyncProcessor");
|
||||
return;
|
||||
}
|
||||
|
||||
// Send headers if they haven't been sent already.
|
||||
if (this._headers)
|
||||
{
|
||||
if (this._powerSeized)
|
||||
this._headers = null;
|
||||
else
|
||||
this._sendHeaders();
|
||||
NS_ASSERT(this._headers === null, "_sendHeaders() failed?");
|
||||
}
|
||||
|
||||
var response = this;
|
||||
var connection = this._connection;
|
||||
NS_ASSERT(!this._headers, "still have headers around but sending body?");
|
||||
|
||||
// If no body data was written, we're done
|
||||
if (!this._bodyInputStream)
|
||||
{
|
||||
dumpn("*** empty body, response finished");
|
||||
response.end();
|
||||
this.end();
|
||||
return;
|
||||
}
|
||||
|
||||
var response = this;
|
||||
var copyObserver =
|
||||
{
|
||||
onStartRequest: function(request, context)
|
||||
|
@ -3932,8 +3977,7 @@ Response.prototype =
|
|||
|
||||
QueryInterface: function(aIID)
|
||||
{
|
||||
if (aIID.equals(Ci.nsIRequestObserver) ||
|
||||
aIID.equals(Ci.nsISupports))
|
||||
if (aIID.equals(Ci.nsIRequestObserver) || aIID.equals(Ci.nsISupports))
|
||||
return this;
|
||||
|
||||
throw Cr.NS_ERROR_NO_INTERFACE;
|
||||
|
@ -3941,7 +3985,7 @@ Response.prototype =
|
|||
};
|
||||
|
||||
dumpn("*** starting async copier of body data...");
|
||||
var copier = this._asyncCopier =
|
||||
this._asyncCopier =
|
||||
new WriteThroughCopier(this._bodyInputStream, this._connection.output,
|
||||
copyObserver, null);
|
||||
},
|
||||
|
@ -3965,30 +4009,45 @@ function notImplemented()
|
|||
throw Cr.NS_ERROR_NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
/** Returns true iff the given exception represents stream closure. */
|
||||
function streamClosed(e)
|
||||
{
|
||||
return e === Cr.NS_BASE_STREAM_CLOSED ||
|
||||
(typeof e === "object" && e.result === Cr.NS_BASE_STREAM_CLOSED);
|
||||
}
|
||||
|
||||
/** Returns true iff the given exception represents a blocked stream. */
|
||||
function wouldBlock(e)
|
||||
{
|
||||
return e === Cr.NS_BASE_STREAM_WOULD_BLOCK ||
|
||||
(typeof e === "object" && e.result === Cr.NS_BASE_STREAM_WOULD_BLOCK);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copies data from input to output as it becomes available.
|
||||
* Copies data from source to sink as it becomes available, when that data can
|
||||
* be written to sink without blocking.
|
||||
*
|
||||
* @param input : nsIAsyncInputStream
|
||||
* @param source : nsIAsyncInputStream
|
||||
* the stream from which data is to be read
|
||||
* @param output : nsIOutputStream
|
||||
* @param sink : nsIAsyncOutputStream
|
||||
* the stream to which data is to be copied
|
||||
* @param observer : nsIRequestObserver
|
||||
* an observer which will be notified when the copy starts and finishes
|
||||
* @param context : nsISupports
|
||||
* context passed to observer when notified of start/stop
|
||||
* @throws NS_ERROR_NULL_POINTER
|
||||
* if input, output, or observer are null
|
||||
* if source, sink, or observer are null
|
||||
*/
|
||||
function WriteThroughCopier(input, output, observer, context)
|
||||
function WriteThroughCopier(source, sink, observer, context)
|
||||
{
|
||||
if (!input || !output || !observer)
|
||||
if (!source || !sink || !observer)
|
||||
throw Cr.NS_ERROR_NULL_POINTER;
|
||||
|
||||
/** Stream from which data is being read. */
|
||||
this._input = input;
|
||||
this._source = source;
|
||||
|
||||
/** Stream to which data is being written. */
|
||||
this._output = new BinaryOutputStream(output);
|
||||
this._sink = sink;
|
||||
|
||||
/** Observer watching this copy. */
|
||||
this._observer = observer;
|
||||
|
@ -3996,7 +4055,16 @@ function WriteThroughCopier(input, output, observer, context)
|
|||
/** Context for the observer watching this. */
|
||||
this._context = context;
|
||||
|
||||
/** False until cancel() is called, when this copy is completed. */
|
||||
/**
|
||||
* True iff this is currently being canceled (cancel has been called, the
|
||||
* callback may not yet have been made).
|
||||
*/
|
||||
this._canceled = false;
|
||||
|
||||
/**
|
||||
* False until all data has been read from input and written to output, at
|
||||
* which point this copy is completed and cancel() is asynchronously called.
|
||||
*/
|
||||
this._completed = false;
|
||||
|
||||
/** Required by nsIRequest, meaningless. */
|
||||
|
@ -4009,64 +4077,303 @@ function WriteThroughCopier(input, output, observer, context)
|
|||
/** Status of this request. */
|
||||
this.status = Cr.NS_OK;
|
||||
|
||||
/** Arrays of byte strings waiting to be written to output. */
|
||||
this._pendingData = [];
|
||||
|
||||
// start copying
|
||||
try
|
||||
{
|
||||
observer.onStartRequest(this, context);
|
||||
this._waitForData();
|
||||
this._waitToReadData();
|
||||
this._waitForSinkClosure();
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
dumpn("!!! error starting copy: " + e);
|
||||
dumpn("!!! error starting copy: " + e +
|
||||
("lineNumber" in e ? ", line " + e.lineNumber : ""));
|
||||
dumpn(e.stack);
|
||||
this.cancel(Cr.NS_ERROR_UNEXPECTED);
|
||||
}
|
||||
}
|
||||
WriteThroughCopier.prototype =
|
||||
{
|
||||
/**
|
||||
* Cancels data copying and asynchronously notifies the observer with the
|
||||
* given error code.
|
||||
*
|
||||
* @param status : nsresult
|
||||
* the status to pass to the observer when data copying has been canceled
|
||||
*/
|
||||
cancel: function(status)
|
||||
{
|
||||
dumpn("*** cancel(" + status.toString(16) + ")");
|
||||
/* nsISupports implementation */
|
||||
|
||||
if (this._completed)
|
||||
QueryInterface: function(iid)
|
||||
{
|
||||
dumpn("*** ignoring cancel on already-canceled copier...");
|
||||
return;
|
||||
if (iid.equals(Ci.nsIInputStreamCallback) ||
|
||||
iid.equals(Ci.nsIOutputStreamCallback) ||
|
||||
iid.equals(Ci.nsIRequest) ||
|
||||
iid.equals(Ci.nsISupports))
|
||||
{
|
||||
return this;
|
||||
}
|
||||
|
||||
this._completed = true;
|
||||
this.status = status;
|
||||
throw Cr.NS_ERROR_NO_INTERFACE;
|
||||
},
|
||||
|
||||
var self = this;
|
||||
var cancelEvent =
|
||||
|
||||
// NSIINPUTSTREAMCALLBACK
|
||||
|
||||
/**
|
||||
* Receives a more-data-in-input notification and writes the corresponding
|
||||
* data to the output.
|
||||
*
|
||||
* @param input : nsIAsyncInputStream
|
||||
* the input stream on whose data we have been waiting
|
||||
*/
|
||||
onInputStreamReady: function(input)
|
||||
{
|
||||
run: function()
|
||||
{
|
||||
dumpn("*** onStopRequest async callback");
|
||||
if (this._source === null)
|
||||
return;
|
||||
|
||||
dumpn("*** onInputStreamReady");
|
||||
|
||||
//
|
||||
// Ordinarily we'll read a non-zero amount of data from input, queue it up
|
||||
// to be written and then wait for further callbacks. The complications in
|
||||
// this method are the cases where we deviate from that behavior when errors
|
||||
// occur or when copying is drawing to a finish.
|
||||
//
|
||||
// The edge cases when reading data are:
|
||||
//
|
||||
// Zero data is read
|
||||
// If zero data was read, we're at the end of available data, so we can
|
||||
// should stop reading and move on to writing out what we have (or, if
|
||||
// we've already done that, onto notifying of completion).
|
||||
// A stream-closed exception is thrown
|
||||
// This is effectively a less kind version of zero data being read; the
|
||||
// only difference is that we notify of completion with that result
|
||||
// rather than with NS_OK.
|
||||
// Some other exception is thrown
|
||||
// This is the least kind result. We don't know what happened, so we
|
||||
// act as though the stream closed except that we notify of completion
|
||||
// with the result NS_ERROR_UNEXPECTED.
|
||||
//
|
||||
|
||||
var bytesWanted = 0, bytesConsumed = -1;
|
||||
try
|
||||
{
|
||||
self._observer.onStopRequest(self, self._context, self.status);
|
||||
input = new BinaryInputStream(input);
|
||||
|
||||
bytesWanted = Math.min(input.available(), Response.SEGMENT_SIZE);
|
||||
dumpn("*** input wanted: " + bytesWanted);
|
||||
|
||||
if (bytesWanted > 0)
|
||||
{
|
||||
var data = input.readByteArray(bytesWanted);
|
||||
bytesConsumed = data.length;
|
||||
this._pendingData.push(String.fromCharCode.apply(String, data));
|
||||
}
|
||||
|
||||
dumpn("*** " + bytesConsumed + " bytes read");
|
||||
|
||||
// Handle the zero-data edge case in the same place as all other edge
|
||||
// cases are handled.
|
||||
if (bytesWanted === 0)
|
||||
throw Cr.NS_BASE_STREAM_CLOSED;
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
NS_ASSERT(false, "how are we throwing an exception here? " + e);
|
||||
if (streamClosed(e))
|
||||
{
|
||||
dumpn("*** input stream closed");
|
||||
e = bytesWanted === 0 ? Cr.NS_OK : Cr.NS_ERROR_UNEXPECTED;
|
||||
}
|
||||
else
|
||||
{
|
||||
dumpn("!!! unexpected error reading from input, canceling: " + e);
|
||||
e = Cr.NS_ERROR_UNEXPECTED;
|
||||
}
|
||||
|
||||
this._doneReadingSource(e);
|
||||
return;
|
||||
}
|
||||
|
||||
var pendingData = this._pendingData;
|
||||
|
||||
NS_ASSERT(bytesConsumed > 0);
|
||||
NS_ASSERT(pendingData.length > 0, "no pending data somehow?");
|
||||
NS_ASSERT(pendingData[pendingData.length - 1].length > 0,
|
||||
"buffered zero bytes of data?");
|
||||
|
||||
NS_ASSERT(this._source !== null);
|
||||
|
||||
// Reading has gone great, and we've gotten data to write now. What if we
|
||||
// don't have a place to write that data, because output went away just
|
||||
// before this read? Drop everything on the floor, including new data, and
|
||||
// cancel at this point.
|
||||
if (this._sink === null)
|
||||
{
|
||||
pendingData.length = 0;
|
||||
this._doneReadingSource(Cr.NS_ERROR_UNEXPECTED);
|
||||
return;
|
||||
}
|
||||
|
||||
// Okay, we've read the data, and we know we have a place to write it. We
|
||||
// need to queue up the data to be written, but *only* if none is queued
|
||||
// already -- if data's already queued, the code that actually writes the
|
||||
// data will make sure to wait on unconsumed pending data.
|
||||
try
|
||||
{
|
||||
if (pendingData.length === 1)
|
||||
this._waitToWriteData();
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
dumpn("!!! error waiting to write data just read, swallowing and " +
|
||||
"writing only what we already have: " + e);
|
||||
this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED);
|
||||
return;
|
||||
}
|
||||
|
||||
// Whee! We successfully read some data, and it's successfully queued up to
|
||||
// be written. All that remains now is to wait for more data to read.
|
||||
try
|
||||
{
|
||||
this._waitToReadData();
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
dumpn("!!! error waiting to read more data: " + e);
|
||||
this._doneReadingSource(Cr.NS_ERROR_UNEXPECTED);
|
||||
}
|
||||
};
|
||||
gThreadManager.currentThread
|
||||
.dispatch(cancelEvent, Ci.nsIThread.DISPATCH_NORMAL);
|
||||
},
|
||||
|
||||
|
||||
// NSIOUTPUTSTREAMCALLBACK
|
||||
|
||||
/**
|
||||
* Returns true if the provided input hasn't been fully consumed and cancel()
|
||||
* hasn't been called.
|
||||
* Callback when data may be written to the output stream without blocking, or
|
||||
* when the output stream has been closed.
|
||||
*
|
||||
* @param output : nsIAsyncOutputStream
|
||||
* the output stream on whose writability we've been waiting, also known as
|
||||
* this._sink
|
||||
*/
|
||||
onOutputStreamReady: function(output)
|
||||
{
|
||||
if (this._sink === null)
|
||||
return;
|
||||
|
||||
dumpn("*** onOutputStreamReady");
|
||||
|
||||
var pendingData = this._pendingData;
|
||||
if (pendingData.length === 0)
|
||||
{
|
||||
// There's no pending data to write. The only way this can happen is if
|
||||
// we're waiting on the output stream's closure, so we can respond to a
|
||||
// copying failure as quickly as possible (rather than waiting for data to
|
||||
// be available to read and then fail to be copied). Therefore, we must
|
||||
// be done now -- don't bother to attempt to write anything and wrap
|
||||
// things up.
|
||||
dumpn("!!! output stream closed prematurely, ending copy");
|
||||
|
||||
this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
NS_ASSERT(pendingData[0].length > 0, "queued up an empty quantum?");
|
||||
|
||||
//
|
||||
// Write out the first pending quantum of data. The possible errors here
|
||||
// are:
|
||||
//
|
||||
// The write might fail because we can't write that much data
|
||||
// Okay, we've written what we can now, so re-queue what's left and
|
||||
// finish writing it out later.
|
||||
// The write failed because the stream was closed
|
||||
// Discard pending data that we can no longer write, stop reading, and
|
||||
// signal that copying finished.
|
||||
// Some other error occurred.
|
||||
// Same as if the stream were closed, but notify with the status
|
||||
// NS_ERROR_UNEXPECTED so the observer knows something was wonky.
|
||||
//
|
||||
|
||||
try
|
||||
{
|
||||
var quantum = pendingData[0];
|
||||
|
||||
// XXX |quantum| isn't guaranteed to be ASCII, so we're relying on
|
||||
// undefined behavior! We're only using this because writeByteArray
|
||||
// is unusably broken for asynchronous output streams; see bug 532834
|
||||
// for details.
|
||||
var bytesWritten = output.write(quantum, quantum.length);
|
||||
if (bytesWritten === quantum.length)
|
||||
pendingData.shift();
|
||||
else
|
||||
pendingData[0] = quantum.substring(bytesWritten);
|
||||
|
||||
dumpn("*** wrote " + bytesWritten + " bytes of data");
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
if (wouldBlock(e))
|
||||
{
|
||||
NS_ASSERT(pendingData.length > 0,
|
||||
"stream-blocking exception with no data to write?");
|
||||
NS_ASSERT(pendingData[0].length > 0,
|
||||
"stream-blocking exception with empty quantum?");
|
||||
this._waitToWriteData();
|
||||
return;
|
||||
}
|
||||
|
||||
if (streamClosed(e))
|
||||
dumpn("!!! output stream prematurely closed, signaling error...");
|
||||
else
|
||||
dumpn("!!! unknown error: " + e + ", quantum=" + quantum);
|
||||
|
||||
this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED);
|
||||
return;
|
||||
}
|
||||
|
||||
// The day is ours! Quantum written, now let's see if we have more data
|
||||
// still to write.
|
||||
try
|
||||
{
|
||||
if (pendingData.length > 0)
|
||||
{
|
||||
this._waitToWriteData();
|
||||
return;
|
||||
}
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
dumpn("!!! unexpected error waiting to write pending data: " + e);
|
||||
this._doneWritingToSink(Cr.NS_ERROR_UNEXPECTED);
|
||||
return;
|
||||
}
|
||||
|
||||
// Okay, we have no more pending data to write -- but might we get more in
|
||||
// the future?
|
||||
if (this._source !== null)
|
||||
{
|
||||
/*
|
||||
* If we might, then wait for the output stream to be closed. (We wait
|
||||
* only for closure because we have no data to write -- and if we waited
|
||||
* for a specific amount of data, we would get repeatedly notified for no
|
||||
* reason if over time the output stream permitted more and more data to
|
||||
* be written to it without blocking.)
|
||||
*/
|
||||
this._waitForSinkClosure();
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* On the other hand, if we can't have more data because the input
|
||||
* stream's gone away, then it's time to notify of copy completion.
|
||||
* Victory!
|
||||
*/
|
||||
this._sink = null;
|
||||
this._cancelOrDispatchCancelCallback(Cr.NS_OK);
|
||||
}
|
||||
},
|
||||
|
||||
|
||||
// NSIREQUEST
|
||||
|
||||
/** Returns true if the cancel observer hasn't been notified yet. */
|
||||
isPending: function()
|
||||
{
|
||||
return !this._completed;
|
||||
|
@ -4078,66 +4385,191 @@ WriteThroughCopier.prototype =
|
|||
resume: notImplemented,
|
||||
|
||||
/**
|
||||
* Receives a more-data-in-input notification and writes the corresponding
|
||||
* data to the output.
|
||||
* Cancels data reading from input, asynchronously writes out any pending
|
||||
* data, and causes the observer to be notified with the given error code when
|
||||
* all writing has finished.
|
||||
*
|
||||
* @param status : nsresult
|
||||
* the status to pass to the observer when data copying has been canceled
|
||||
*/
|
||||
onInputStreamReady: function(input)
|
||||
cancel: function(status)
|
||||
{
|
||||
dumpn("*** onInputStreamReady");
|
||||
if (this._completed)
|
||||
dumpn("*** cancel(" + status.toString(16) + ")");
|
||||
|
||||
if (this._canceled)
|
||||
{
|
||||
dumpn("*** ignoring stream-ready callback on a canceled copier...");
|
||||
dumpn("*** suppressing a late cancel");
|
||||
return;
|
||||
}
|
||||
|
||||
input = new BinaryInputStream(input);
|
||||
this._canceled = true;
|
||||
this.status = status;
|
||||
|
||||
// We could be in the middle of absolutely anything at this point. Both
|
||||
// input and output might still be around, we might have pending data to
|
||||
// write, and in general we know nothing about the state of the world. We
|
||||
// therefore must assume everything's in progress and take everything to its
|
||||
// final steady state (or so far as it can go before we need to finish
|
||||
// writing out remaining data).
|
||||
|
||||
this._doneReadingSource(status);
|
||||
},
|
||||
|
||||
|
||||
// PRIVATE IMPLEMENTATION
|
||||
|
||||
/**
|
||||
* Stop reading input if we haven't already done so, passing e as the status
|
||||
* when closing the stream, and kick off a copy-completion notice if no more
|
||||
* data remains to be written.
|
||||
*
|
||||
* @param e : nsresult
|
||||
* the status to be used when closing the input stream
|
||||
*/
|
||||
_doneReadingSource: function(e)
|
||||
{
|
||||
dumpn("*** _doneReadingSource(0x" + e.toString(16) + ")");
|
||||
|
||||
this._finishSource(e);
|
||||
if (this._pendingData.length === 0)
|
||||
this._sink = null;
|
||||
else
|
||||
NS_ASSERT(this._sink !== null, "null output?");
|
||||
|
||||
// If we've written out all data read up to this point, then it's time to
|
||||
// signal completion.
|
||||
if (this._sink === null)
|
||||
{
|
||||
NS_ASSERT(this._pendingData.length === 0, "pending data still?");
|
||||
this._cancelOrDispatchCancelCallback(e);
|
||||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Stop writing output if we haven't already done so, discard any data that
|
||||
* remained to be sent, close off input if it wasn't already closed, and kick
|
||||
* off a copy-completion notice.
|
||||
*
|
||||
* @param e : nsresult
|
||||
* the status to be used when closing input if it wasn't already closed
|
||||
*/
|
||||
_doneWritingToSink: function(e)
|
||||
{
|
||||
dumpn("*** _doneWritingToSink(0x" + e.toString(16) + ")");
|
||||
|
||||
this._pendingData.length = 0;
|
||||
this._sink = null;
|
||||
this._doneReadingSource(e);
|
||||
},
|
||||
|
||||
/**
|
||||
* Completes processing of this copy: either by canceling the copy if it
|
||||
* hasn't already been canceled using the provided status, or by dispatching
|
||||
* the cancel callback event (with the originally provided status, of course)
|
||||
* if it already has been canceled.
|
||||
*
|
||||
* @param status : nsresult
|
||||
* the status code to use to cancel this, if this hasn't already been
|
||||
* canceled
|
||||
*/
|
||||
_cancelOrDispatchCancelCallback: function(status)
|
||||
{
|
||||
dumpn("*** _cancelOrDispatchCancelCallback(" + status + ")");
|
||||
|
||||
NS_ASSERT(this._source === null, "should have finished input");
|
||||
NS_ASSERT(this._sink === null, "should have finished output");
|
||||
NS_ASSERT(this._pendingData.length === 0, "should have no pending data");
|
||||
|
||||
if (!this._canceled)
|
||||
{
|
||||
this.cancel(status);
|
||||
return;
|
||||
}
|
||||
|
||||
var self = this;
|
||||
var event =
|
||||
{
|
||||
run: function()
|
||||
{
|
||||
dumpn("*** onStopRequest async callback");
|
||||
|
||||
self._completed = true;
|
||||
try
|
||||
{
|
||||
var avail = input.available();
|
||||
var data = input.readByteArray(avail);
|
||||
this._output.writeByteArray(data, data.length);
|
||||
self._observer.onStopRequest(self, self._context, self.status);
|
||||
}
|
||||
catch (e)
|
||||
{
|
||||
if (e === Cr.NS_BASE_STREAM_CLOSED ||
|
||||
e.result === Cr.NS_BASE_STREAM_CLOSED)
|
||||
{
|
||||
this.cancel(Cr.NS_OK);
|
||||
NS_ASSERT(false,
|
||||
"how are we throwing an exception here? we control " +
|
||||
"all the callers! " + e);
|
||||
}
|
||||
else
|
||||
{
|
||||
dumpn("!!! error copying from input to output: " + e);
|
||||
this.cancel(Cr.NS_ERROR_UNEXPECTED);
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if (avail === 0)
|
||||
this.cancel(Cr.NS_OK);
|
||||
else
|
||||
this._waitForData();
|
||||
gThreadManager.currentThread.dispatch(event, Ci.nsIThread.DISPATCH_NORMAL);
|
||||
},
|
||||
|
||||
/**
|
||||
* Kicks off another wait for more data to be available from the input stream.
|
||||
*/
|
||||
_waitForData: function()
|
||||
_waitToReadData: function()
|
||||
{
|
||||
dumpn("*** _waitForData");
|
||||
this._input.asyncWait(this, 0, 1, gThreadManager.mainThread);
|
||||
dumpn("*** _waitToReadData");
|
||||
this._source.asyncWait(this, 0, Response.SEGMENT_SIZE,
|
||||
gThreadManager.mainThread);
|
||||
},
|
||||
|
||||
/** nsISupports implementation */
|
||||
QueryInterface: function(iid)
|
||||
/**
|
||||
* Kicks off another wait until data can be written to the output stream.
|
||||
*/
|
||||
_waitToWriteData: function()
|
||||
{
|
||||
if (iid.equals(Ci.nsIRequest) ||
|
||||
iid.equals(Ci.nsISupports) ||
|
||||
iid.equals(Ci.nsIInputStreamCallback))
|
||||
{
|
||||
return this;
|
||||
}
|
||||
dumpn("*** _waitToWriteData");
|
||||
|
||||
throw Cr.NS_ERROR_NO_INTERFACE;
|
||||
var pendingData = this._pendingData;
|
||||
NS_ASSERT(pendingData.length > 0, "no pending data to write?");
|
||||
NS_ASSERT(pendingData[0].length > 0, "buffered an empty write?");
|
||||
|
||||
this._sink.asyncWait(this, 0, pendingData[0].length,
|
||||
gThreadManager.mainThread);
|
||||
},
|
||||
|
||||
/**
|
||||
* Kicks off a wait for the sink to which data is being copied to be closed.
|
||||
* We wait for stream closure when we don't have any data to be copied, rather
|
||||
* than waiting to write a specific amount of data. We can't wait to write
|
||||
* data because the sink might be infinitely writable, and if no data appears
|
||||
* in the source for a long time we might have to spin quite a bit waiting to
|
||||
* write, waiting to write again, &c. Waiting on stream closure instead means
|
||||
* we'll get just one notification if the sink dies. Note that when data
|
||||
* starts arriving from the sink we'll resume waiting for data to be written,
|
||||
* dropping this closure-only callback entirely.
|
||||
*/
|
||||
_waitForSinkClosure: function()
|
||||
{
|
||||
dumpn("*** _waitForSinkClosure");
|
||||
|
||||
this._sink.asyncWait(this, Ci.nsIAsyncOutputStream.WAIT_CLOSURE_ONLY, 0,
|
||||
gThreadManager.mainThread);
|
||||
},
|
||||
|
||||
/**
|
||||
* Closes input with the given status, if it hasn't already been closed;
|
||||
* otherwise a no-op.
|
||||
*
|
||||
* @param status : nsresult
|
||||
* status code use to close the source stream if necessary
|
||||
*/
|
||||
_finishSource: function(status)
|
||||
{
|
||||
dumpn("*** _finishSource(" + status.toString(16) + ")");
|
||||
|
||||
if (this._source !== null)
|
||||
{
|
||||
this._source.closeWithStatus(status);
|
||||
this._source = null;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -362,28 +362,29 @@ interface nsIHttpServerIdentity : nsISupports
|
|||
interface nsIHttpRequestHandler : nsISupports
|
||||
{
|
||||
/**
|
||||
* Processes the HTTP request represented by metadata and initializes the
|
||||
* passed-in response to reflect the correct HTTP response.
|
||||
* Processes an HTTP request and initializes the passed-in response to reflect
|
||||
* the correct HTTP response.
|
||||
*
|
||||
* If this method throws an exception, externally observable behavior depends
|
||||
* upon whether is being processed asynchronously and the connection has had
|
||||
* any data written to it (even an explicit zero bytes of data being written)
|
||||
* or whether seizePower() has been called on it. If such has happened, sent
|
||||
* data will be exactly that data written at the time the exception was
|
||||
* thrown. If no data has been written, the response has not had seizePower()
|
||||
* called on it, and it is not being asynchronously created, an error handler
|
||||
* will be invoked (usually 500 unless otherwise specified). Note that some
|
||||
* uses of nsIHttpRequestHandler may require this method to never throw an
|
||||
* exception; in the general case, however, this method may throw an exception
|
||||
* (causing an HTTP 500 response to occur).
|
||||
* upon whether is being processed asynchronously. If such is the case, the
|
||||
* output is some prefix (perhaps all, perhaps none, perhaps only some) of the
|
||||
* data which would have been sent if, instead, the response had been finished
|
||||
* at that point. If no data has been written, the response has not had
|
||||
* seizePower() called on it, and it is not being asynchronously created, an
|
||||
* error handler will be invoked (usually 500 unless otherwise specified).
|
||||
*
|
||||
* @param metadata
|
||||
* Some uses of nsIHttpRequestHandler may require this method to never throw
|
||||
* an exception; in the general case, however, this method may throw an
|
||||
* exception (causing an HTTP 500 response to occur, if the above conditions
|
||||
* are met).
|
||||
*
|
||||
* @param request
|
||||
* data representing an HTTP request
|
||||
* @param response
|
||||
* an initially-empty response which must be modified to reflect the data
|
||||
* which should be sent as the response to the request described by metadata
|
||||
*/
|
||||
void handle(in nsIHttpRequest metadata, in nsIHttpResponse response);
|
||||
void handle(in nsIHttpRequest request, in nsIHttpResponse response);
|
||||
};
|
||||
|
||||
|
||||
|
@ -556,12 +557,6 @@ interface nsIHttpResponse : nsISupports
|
|||
* both in the server socket and in the client may delay written data; be
|
||||
* prepared for delays at any time.
|
||||
*
|
||||
* @note
|
||||
* Although in the asynchronous cases writes to the underlying transport
|
||||
* are not buffered, care must still be taken not to block for too long on
|
||||
* any such writes; it is even possible for deadlock to occur in the case
|
||||
* that the server and the client reside in the same process. Write data in
|
||||
* small chunks if necessary to avoid this problem.
|
||||
* @throws NS_ERROR_NOT_AVAILABLE
|
||||
* if accessed after this response is fully constructed
|
||||
*/
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -48,6 +48,8 @@ var tests =
|
|||
null, start_functionHandler, null),
|
||||
new Test("http://localhost:4444/non-existent-path",
|
||||
null, start_non_existent_path, null),
|
||||
new Test("http://localhost:4444/lotsOfHeaders",
|
||||
null, start_lots_of_headers, null),
|
||||
];
|
||||
|
||||
function run_test()
|
||||
|
@ -64,12 +66,14 @@ function run_test()
|
|||
// register a few test paths
|
||||
srv.registerPathHandler("/objHandler", objHandler);
|
||||
srv.registerPathHandler("/functionHandler", functionHandler);
|
||||
srv.registerPathHandler("/lotsOfHeaders", lotsOfHeadersHandler);
|
||||
|
||||
srv.start(4444);
|
||||
|
||||
runHttpTests(tests, testComplete(srv));
|
||||
}
|
||||
|
||||
const HEADER_COUNT = 1000;
|
||||
|
||||
// TEST DATA
|
||||
|
||||
|
@ -122,6 +126,16 @@ function start_non_existent_path(ch, cx)
|
|||
do_check_false(ch.requestSucceeded);
|
||||
}
|
||||
|
||||
function start_lots_of_headers(ch, cx)
|
||||
{
|
||||
commonCheck(ch);
|
||||
|
||||
do_check_eq(ch.responseStatus, 200);
|
||||
do_check_true(ch.requestSucceeded);
|
||||
|
||||
for (var i = 0; i < HEADER_COUNT; i++)
|
||||
do_check_eq(ch.getResponseHeader("X-Header-" + i), "value " + i);
|
||||
}
|
||||
|
||||
// PATH HANDLERS
|
||||
|
||||
|
@ -175,3 +189,12 @@ function functionHandler(metadata, response)
|
|||
var body = "this is text\n";
|
||||
response.bodyOutputStream.write(body, body.length);
|
||||
}
|
||||
|
||||
// /lotsOfHeaders
|
||||
function lotsOfHeadersHandler(request, response)
|
||||
{
|
||||
response.setHeader("Content-Type", "text/plain", false);
|
||||
|
||||
for (var i = 0; i < HEADER_COUNT; i++)
|
||||
response.setHeader("X-Header-" + i, "value " + i, false);
|
||||
}
|
||||
|
|
|
@ -271,43 +271,6 @@ test = new Test(PREPATH + "/handleAsync2",
|
|||
tests.push(test);
|
||||
|
||||
|
||||
const ASYNC_ERROR_BODY = "hi, I'm a body!";
|
||||
|
||||
function handleAsyncError(request, response)
|
||||
{
|
||||
response.setStatusLine(request.httpVersion, 200, "Async Error");
|
||||
response.setHeader("X-Foo", "header value", false);
|
||||
|
||||
response.processAsync();
|
||||
|
||||
response.write(ASYNC_ERROR_BODY, ASYNC_ERROR_BODY.length);
|
||||
|
||||
// No turning back now -- except if there's an error!
|
||||
throw "Monkey wrench!";
|
||||
}
|
||||
handlers["/handleAsyncError"] = handleAsyncError;
|
||||
|
||||
function start_handleAsyncError(ch, cx)
|
||||
{
|
||||
do_check_eq(ch.responseStatus, 200);
|
||||
do_check_eq(ch.responseStatusText, "Async Error");
|
||||
do_check_eq(ch.getResponseHeader("X-Foo"), "header value");
|
||||
}
|
||||
|
||||
function stop_handleAsyncError(ch, cx, status, data)
|
||||
{
|
||||
// Lies! But not really!
|
||||
do_check_true(ch.requestSucceeded);
|
||||
|
||||
do_check_eq(data.length, ASYNC_ERROR_BODY.length);
|
||||
do_check_eq(String.fromCharCode.apply(null, data), ASYNC_ERROR_BODY);
|
||||
}
|
||||
|
||||
test = new Test(PREPATH + "/handleAsyncError",
|
||||
null, start_handleAsyncError, stop_handleAsyncError);
|
||||
tests.push(test);
|
||||
|
||||
|
||||
/*
|
||||
* Tests that accessing output stream *before* calling processAsync() works
|
||||
* correctly, sending written data immediately as it is written, not buffering
|
||||
|
|
|
@ -54,9 +54,6 @@ function run_test()
|
|||
srv.registerPathHandler("/exceptions", handleExceptions);
|
||||
srv.registerPathHandler("/async-seizure", handleAsyncSeizure);
|
||||
srv.registerPathHandler("/seize-after-async", handleSeizeAfterAsync);
|
||||
srv.registerPathHandler("/thrown-exception", handleThrownException);
|
||||
srv.registerPathHandler("/asap-later-write", handleASAPLaterWrite);
|
||||
srv.registerPathHandler("/asap-later-finish", handleASAPLaterFinish);
|
||||
|
||||
srv.start(PORT);
|
||||
|
||||
|
@ -165,51 +162,6 @@ function handleSeizeAfterAsync(request, response)
|
|||
});
|
||||
}
|
||||
|
||||
function handleThrownException(request, response)
|
||||
{
|
||||
if (request.queryString === "writeBefore")
|
||||
response.write("ignore this");
|
||||
else if (request.queryString === "writeBeforeEmpty")
|
||||
response.write("");
|
||||
else if (request.queryString !== "")
|
||||
throw "query string FAIL";
|
||||
response.seizePower();
|
||||
response.write("preparing to throw...");
|
||||
throw "badness 10000";
|
||||
}
|
||||
|
||||
function handleASAPLaterWrite(request, response)
|
||||
{
|
||||
response.seizePower();
|
||||
response.write("should only ");
|
||||
response.write("see this");
|
||||
|
||||
callASAPLater(function()
|
||||
{
|
||||
response.write("...and not this");
|
||||
callASAPLater(function()
|
||||
{
|
||||
response.write("...or this");
|
||||
response.finish();
|
||||
});
|
||||
});
|
||||
|
||||
throw "opening pitch of the ballgame";
|
||||
}
|
||||
|
||||
function handleASAPLaterFinish(request, response)
|
||||
{
|
||||
response.seizePower();
|
||||
response.write("should only see this");
|
||||
|
||||
callASAPLater(function()
|
||||
{
|
||||
response.finish();
|
||||
});
|
||||
|
||||
throw "out the bum!";
|
||||
}
|
||||
|
||||
|
||||
/***************
|
||||
* BEGIN TESTS *
|
||||
|
@ -262,48 +214,3 @@ function checkSeizeAfterAsync(data)
|
|||
}
|
||||
test = new RawTest("localhost", PORT, data, checkSeizeAfterAsync),
|
||||
tests.push(test);
|
||||
|
||||
data = "GET /thrown-exception?writeBefore HTTP/1.0\r\n" +
|
||||
"\r\n";
|
||||
function checkThrownExceptionWriteBefore(data)
|
||||
{
|
||||
do_check_eq(data, "preparing to throw...");
|
||||
}
|
||||
test = new RawTest("localhost", PORT, data, checkThrownExceptionWriteBefore),
|
||||
tests.push(test);
|
||||
|
||||
data = "GET /thrown-exception?writeBeforeEmpty HTTP/1.0\r\n" +
|
||||
"\r\n";
|
||||
function checkThrownExceptionWriteBeforeEmpty(data)
|
||||
{
|
||||
do_check_eq(data, "preparing to throw...");
|
||||
}
|
||||
test = new RawTest("localhost", PORT, data, checkThrownExceptionWriteBeforeEmpty),
|
||||
tests.push(test);
|
||||
|
||||
data = "GET /thrown-exception HTTP/1.0\r\n" +
|
||||
"\r\n";
|
||||
function checkThrownException(data)
|
||||
{
|
||||
do_check_eq(data, "preparing to throw...");
|
||||
}
|
||||
test = new RawTest("localhost", PORT, data, checkThrownException),
|
||||
tests.push(test);
|
||||
|
||||
data = "GET /asap-later-write HTTP/1.0\r\n" +
|
||||
"\r\n";
|
||||
function checkASAPLaterWrite(data)
|
||||
{
|
||||
do_check_eq(data, "should only see this");
|
||||
}
|
||||
test = new RawTest("localhost", PORT, data, checkASAPLaterWrite),
|
||||
tests.push(test);
|
||||
|
||||
data = "GET /asap-later-finish HTTP/1.0\r\n" +
|
||||
"\r\n";
|
||||
function checkASAPLaterFinish(data)
|
||||
{
|
||||
do_check_eq(data, "should only see this");
|
||||
}
|
||||
test = new RawTest("localhost", PORT, data, checkASAPLaterFinish),
|
||||
tests.push(test);
|
||||
|
|
Загрузка…
Ссылка в новой задаче