Bug 1356237 - Ship a copy of transport module in marionette. r=ato

DevTools is about to move out of mozilla-central and be released as an add-on.
So that these protocol files are going to be missing from the tree.
To accommodate that, we are doing a copy of them next to marionette.

MozReview-Commit-ID: 9PyhuwyZyXI

--HG--
extra : rebase_source : b6d96ae5e4c4ac837713e396ab72163b168871f2
This commit is contained in:
Alexandre Poirot 2017-04-24 20:23:04 +02:00
Родитель 8767e6b8cb
Коммит eda858a388
5 изменённых файлов: 1539 добавлений и 2 удалений

Просмотреть файл

@ -32,6 +32,9 @@ marionette.jar:
content/assert.js (assert.js)
content/addon.js (addon.js)
content/session.js (session.js)
content/transport.js (transport.js)
content/packets.js (packets.js)
content/stream-utils.js (stream-utils.js)
#ifdef ENABLE_TESTS
content/test.xul (chrome/test.xul)
content/test2.xul (chrome/test2.xul)

Просмотреть файл

@ -0,0 +1,397 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
/**
* Packets contain read / write functionality for the different packet types
* supported by the debugging protocol, so that a transport can focus on
* delivery and queue management without worrying too much about the specific
* packet types.
*
* They are intended to be "one use only", so a new packet should be
* instantiated for each incoming or outgoing packet.
*
* A complete Packet type should expose at least the following:
* * read(stream, scriptableStream)
* Called when the input stream has data to read
* * write(stream)
* Called when the output stream is ready to write
* * get done()
* Returns true once the packet is done being read / written
* * destroy()
* Called to clean up at the end of use
*/
const {classes: Cc, interfaces: Ci, utils: Cu} = Components;
const { StreamUtils } = Cu.import("chrome://marionette/content/stream-utils.js");
const unicodeConverter = Cc["@mozilla.org/intl/scriptableunicodeconverter"]
.createInstance(Ci.nsIScriptableUnicodeConverter);
unicodeConverter.charset = "UTF-8";
const defer = function () {
let deferred = {
promise: new Promise((resolve, reject) => {
deferred.resolve = resolve;
deferred.reject = reject;
})
};
return deferred;
};
this.EXPORTED_SYMBOLS = ["RawPacket", "Packet", "JSONPacket", "BulkPacket"];
// The transport's previous check ensured the header length did not exceed 20
// characters. Here, we opt for the somewhat smaller, but still large limit of
// 1 TiB.
const PACKET_LENGTH_MAX = Math.pow(2, 40);
/**
* A generic Packet processing object (extended by two subtypes below).
*/
function Packet(transport) {
this._transport = transport;
this._length = 0;
}
/**
* Attempt to initialize a new Packet based on the incoming packet header we've
* received so far. We try each of the types in succession, trying JSON packets
* first since they are much more common.
* @param header string
* The packet header string to attempt parsing.
* @param transport DebuggerTransport
* The transport instance that will own the packet.
* @return Packet
* The parsed packet of the matching type, or null if no types matched.
*/
Packet.fromHeader = function (header, transport) {
return JSONPacket.fromHeader(header, transport) ||
BulkPacket.fromHeader(header, transport);
};
Packet.prototype = {
get length() {
return this._length;
},
set length(length) {
if (length > PACKET_LENGTH_MAX) {
throw Error("Packet length " + length + " exceeds the max length of " +
PACKET_LENGTH_MAX);
}
this._length = length;
},
destroy: function () {
this._transport = null;
}
};
/**
* With a JSON packet (the typical packet type sent via the transport), data is
* transferred as a JSON packet serialized into a string, with the string length
* prepended to the packet, followed by a colon ([length]:[packet]). The
* contents of the JSON packet are specified in the Remote Debugging Protocol
* specification.
* @param transport DebuggerTransport
* The transport instance that will own the packet.
*/
function JSONPacket(transport) {
Packet.call(this, transport);
this._data = "";
this._done = false;
}
/**
* Attempt to initialize a new JSONPacket based on the incoming packet header
* we've received so far.
* @param header string
* The packet header string to attempt parsing.
* @param transport DebuggerTransport
* The transport instance that will own the packet.
* @return JSONPacket
* The parsed packet, or null if it's not a match.
*/
JSONPacket.fromHeader = function (header, transport) {
let match = this.HEADER_PATTERN.exec(header);
if (!match) {
return null;
}
let packet = new JSONPacket(transport);
packet.length = +match[1];
return packet;
};
JSONPacket.HEADER_PATTERN = /^(\d+):$/;
JSONPacket.prototype = Object.create(Packet.prototype);
Object.defineProperty(JSONPacket.prototype, "object", {
/**
* Gets the object (not the serialized string) being read or written.
*/
get: function () {
return this._object;
},
/**
* Sets the object to be sent when write() is called.
*/
set: function (object) {
this._object = object;
let data = JSON.stringify(object);
this._data = unicodeConverter.ConvertFromUnicode(data);
this.length = this._data.length;
}
});
JSONPacket.prototype.read = function (stream, scriptableStream) {
// Read in more packet data.
this._readData(stream, scriptableStream);
if (!this.done) {
// Don't have a complete packet yet.
return;
}
let json = this._data;
try {
json = unicodeConverter.ConvertToUnicode(json);
this._object = JSON.parse(json);
} catch (e) {
let msg = "Error parsing incoming packet: " + json + " (" + e +
" - " + e.stack + ")";
console.error(msg);
dump(msg + "\n");
return;
}
this._transport._onJSONObjectReady(this._object);
};
JSONPacket.prototype._readData = function (stream, scriptableStream) {
let bytesToRead = Math.min(this.length - this._data.length,
stream.available());
this._data += scriptableStream.readBytes(bytesToRead);
this._done = this._data.length === this.length;
};
JSONPacket.prototype.write = function (stream) {
if (this._outgoing === undefined) {
// Format the serialized packet to a buffer
this._outgoing = this.length + ":" + this._data;
}
let written = stream.write(this._outgoing, this._outgoing.length);
this._outgoing = this._outgoing.slice(written);
this._done = !this._outgoing.length;
};
Object.defineProperty(JSONPacket.prototype, "done", {
get: function () {
return this._done;
}
});
JSONPacket.prototype.toString = function () {
return JSON.stringify(this._object, null, 2);
};
/**
* With a bulk packet, data is transferred by temporarily handing over the
* transport's input or output stream to the application layer for writing data
* directly. This can be much faster for large data sets, and avoids various
* stages of copies and data duplication inherent in the JSON packet type. The
* bulk packet looks like:
*
* bulk [actor] [type] [length]:[data]
*
* The interpretation of the data portion depends on the kind of actor and the
* packet's type. See the Remote Debugging Protocol Stream Transport spec for
* more details.
* @param transport DebuggerTransport
* The transport instance that will own the packet.
*/
function BulkPacket(transport) {
Packet.call(this, transport);
this._done = false;
this._readyForWriting = defer();
}
/**
* Attempt to initialize a new BulkPacket based on the incoming packet header
* we've received so far.
* @param header string
* The packet header string to attempt parsing.
* @param transport DebuggerTransport
* The transport instance that will own the packet.
* @return BulkPacket
* The parsed packet, or null if it's not a match.
*/
BulkPacket.fromHeader = function (header, transport) {
let match = this.HEADER_PATTERN.exec(header);
if (!match) {
return null;
}
let packet = new BulkPacket(transport);
packet.header = {
actor: match[1],
type: match[2],
length: +match[3]
};
return packet;
};
BulkPacket.HEADER_PATTERN = /^bulk ([^: ]+) ([^: ]+) (\d+):$/;
BulkPacket.prototype = Object.create(Packet.prototype);
BulkPacket.prototype.read = function (stream) {
// Temporarily pause monitoring of the input stream
this._transport.pauseIncoming();
let deferred = defer();
this._transport._onBulkReadReady({
actor: this.actor,
type: this.type,
length: this.length,
copyTo: (output) => {
let copying = StreamUtils.copyStream(stream, output, this.length);
deferred.resolve(copying);
return copying;
},
stream: stream,
done: deferred
});
// Await the result of reading from the stream
deferred.promise.then(() => {
this._done = true;
this._transport.resumeIncoming();
}, this._transport.close);
// Ensure this is only done once
this.read = () => {
throw new Error("Tried to read() a BulkPacket's stream multiple times.");
};
};
BulkPacket.prototype.write = function (stream) {
if (this._outgoingHeader === undefined) {
// Format the serialized packet header to a buffer
this._outgoingHeader = "bulk " + this.actor + " " + this.type + " " +
this.length + ":";
}
// Write the header, or whatever's left of it to write.
if (this._outgoingHeader.length) {
let written = stream.write(this._outgoingHeader,
this._outgoingHeader.length);
this._outgoingHeader = this._outgoingHeader.slice(written);
return;
}
// Temporarily pause the monitoring of the output stream
this._transport.pauseOutgoing();
let deferred = defer();
this._readyForWriting.resolve({
copyFrom: (input) => {
let copying = StreamUtils.copyStream(input, stream, this.length);
deferred.resolve(copying);
return copying;
},
stream: stream,
done: deferred
});
// Await the result of writing to the stream
deferred.promise.then(() => {
this._done = true;
this._transport.resumeOutgoing();
}, this._transport.close);
// Ensure this is only done once
this.write = () => {
throw new Error("Tried to write() a BulkPacket's stream multiple times.");
};
};
Object.defineProperty(BulkPacket.prototype, "streamReadyForWriting", {
get: function () {
return this._readyForWriting.promise;
}
});
Object.defineProperty(BulkPacket.prototype, "header", {
get: function () {
return {
actor: this.actor,
type: this.type,
length: this.length
};
},
set: function (header) {
this.actor = header.actor;
this.type = header.type;
this.length = header.length;
},
});
Object.defineProperty(BulkPacket.prototype, "done", {
get: function () {
return this._done;
},
});
BulkPacket.prototype.toString = function () {
return "Bulk: " + JSON.stringify(this.header, null, 2);
};
/**
* RawPacket is used to test the transport's error handling of malformed
* packets, by writing data directly onto the stream.
* @param transport DebuggerTransport
* The transport instance that will own the packet.
* @param data string
* The raw string to send out onto the stream.
*/
function RawPacket(transport, data) {
Packet.call(this, transport);
this._data = data;
this.length = data.length;
this._done = false;
}
RawPacket.prototype = Object.create(Packet.prototype);
RawPacket.prototype.read = function (stream) {
// This hasn't yet been needed for testing.
throw Error("Not implmented.");
};
RawPacket.prototype.write = function (stream) {
let written = stream.write(this._data, this._data.length);
this._data = this._data.slice(written);
this._done = !this._data.length;
};
Object.defineProperty(RawPacket.prototype, "done", {
get: function () {
return this._done;
}
});

Просмотреть файл

@ -19,8 +19,7 @@ Cu.import("chrome://marionette/content/driver.js");
Cu.import("chrome://marionette/content/error.js");
Cu.import("chrome://marionette/content/message.js");
// Bug 1083711: Load transport.js as an SDK module instead of subscript
loader.loadSubScript("resource://devtools/shared/transport/transport.js");
Cu.import("chrome://marionette/content/transport.js");
const logger = Log.repository.getLogger("Marionette");

Просмотреть файл

@ -0,0 +1,242 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
const {Constructor: CC, classes: Cc, interfaces: Ci, utils: Cu} = Components;
Cu.import("resource://devtools/shared/event-emitter.js");
Cu.import("resource://gre/modules/Services.jsm");
const IOUtil = Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
const ScriptableInputStream =
CC("@mozilla.org/scriptableinputstream;1",
"nsIScriptableInputStream", "init");
this.EXPORTED_SYMBOLS = ["StreamUtils"];
const BUFFER_SIZE = 0x8000;
/**
* This helper function (and its companion object) are used by bulk senders and
* receivers to read and write data in and out of other streams. Functions that
* make use of this tool are passed to callers when it is time to read or write
* bulk data. It is highly recommended to use these copier functions instead of
* the stream directly because the copier enforces the agreed upon length.
* Since bulk mode reuses an existing stream, the sender and receiver must write
* and read exactly the agreed upon amount of data, or else the entire transport
* will be left in a invalid state. Additionally, other methods of stream
* copying (such as NetUtil.asyncCopy) close the streams involved, which would
* terminate the debugging transport, and so it is avoided here.
*
* Overall, this *works*, but clearly the optimal solution would be able to just
* use the streams directly. If it were possible to fully implement
* nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
* enforce the length and avoid closing, and consumers could use familiar stream
* utilities like NetUtil.asyncCopy.
*
* The function takes two async streams and copies a precise number of bytes
* from one to the other. Copying begins immediately, but may complete at some
* future time depending on data size. Use the returned promise to know when
* it's complete.
*
* @param input nsIAsyncInputStream
* The stream to copy from.
* @param output nsIAsyncOutputStream
* The stream to copy to.
* @param length Integer
* The amount of data that needs to be copied.
* @return Promise
* The promise is resolved when copying completes or rejected if any
* (unexpected) errors occur.
*/
function copyStream(input, output, length) {
let copier = new StreamCopier(input, output, length);
return copier.copy();
}
function StreamCopier(input, output, length) {
EventEmitter.decorate(this);
this._id = StreamCopier._nextId++;
this.input = input;
// Save off the base output stream, since we know it's async as we've required
this.baseAsyncOutput = output;
if (IOUtil.outputStreamIsBuffered(output)) {
this.output = output;
} else {
this.output = Cc["@mozilla.org/network/buffered-output-stream;1"]
.createInstance(Ci.nsIBufferedOutputStream);
this.output.init(output, BUFFER_SIZE);
}
this._length = length;
this._amountLeft = length;
this._deferred = {
promise: new Promise((resolve, reject) => {
this._deferred.resolve = resolve;
this._deferred.reject = reject;
})
};
this._copy = this._copy.bind(this);
this._flush = this._flush.bind(this);
this._destroy = this._destroy.bind(this);
// Copy promise's then method up to this object.
// Allows the copier to offer a promise interface for the simple succeed or
// fail scenarios, but also emit events (due to the EventEmitter) for other
// states, like progress.
this.then = this._deferred.promise.then.bind(this._deferred.promise);
this.then(this._destroy, this._destroy);
// Stream ready callback starts as |_copy|, but may switch to |_flush| at end
// if flushing would block the output stream.
this._streamReadyCallback = this._copy;
}
StreamCopier._nextId = 0;
StreamCopier.prototype = {
copy: function () {
// Dispatch to the next tick so that it's possible to attach a progress
// event listener, even for extremely fast copies (like when testing).
Services.tm.currentThread.dispatch(() => {
try {
this._copy();
} catch (e) {
this._deferred.reject(e);
}
}, 0);
return this;
},
_copy: function () {
let bytesAvailable = this.input.available();
let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
this._debug("Trying to copy: " + amountToCopy);
let bytesCopied;
try {
bytesCopied = this.output.writeFrom(this.input, amountToCopy);
} catch (e) {
if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
this._debug("Base stream would block, will retry");
this._debug("Waiting for output stream");
this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
return;
}
throw e;
}
this._amountLeft -= bytesCopied;
this._debug("Copied: " + bytesCopied +
", Left: " + this._amountLeft);
this._emitProgress();
if (this._amountLeft === 0) {
this._debug("Copy done!");
this._flush();
return;
}
this._debug("Waiting for input stream");
this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
},
_emitProgress: function () {
this.emit("progress", {
bytesSent: this._length - this._amountLeft,
totalBytes: this._length
});
},
_flush: function () {
try {
this.output.flush();
} catch (e) {
if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
e.result == Cr.NS_ERROR_FAILURE) {
this._debug("Flush would block, will retry");
this._streamReadyCallback = this._flush;
this._debug("Waiting for output stream");
this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
return;
}
throw e;
}
this._deferred.resolve();
},
_destroy: function () {
this._destroy = null;
this._copy = null;
this._flush = null;
this.input = null;
this.output = null;
},
// nsIInputStreamCallback
onInputStreamReady: function () {
this._streamReadyCallback();
},
// nsIOutputStreamCallback
onOutputStreamReady: function () {
this._streamReadyCallback();
},
_debug: function (msg) {
}
};
/**
* Read from a stream, one byte at a time, up to the next |delimiter|
* character, but stopping if we've read |count| without finding it. Reading
* also terminates early if there are less than |count| bytes available on the
* stream. In that case, we only read as many bytes as the stream currently has
* to offer.
* TODO: This implementation could be removed if bug 984651 is fixed, which
* provides a native version of the same idea.
* @param stream nsIInputStream
* The input stream to read from.
* @param delimiter string
* The character we're trying to find.
* @param count integer
* The max number of characters to read while searching.
* @return string
* The data collected. If the delimiter was found, this string will
* end with it.
*/
function delimitedRead(stream, delimiter, count) {
let scriptableStream;
if (stream instanceof Ci.nsIScriptableInputStream) {
scriptableStream = stream;
} else {
scriptableStream = new ScriptableInputStream(stream);
}
let data = "";
// Don't exceed what's available on the stream
count = Math.min(count, stream.available());
if (count <= 0) {
return data;
}
let char;
while (char !== delimiter && count > 0) {
char = scriptableStream.readBytes(1);
count--;
data += char;
}
return data;
}
const StreamUtils = {
copyStream,
delimitedRead
};

Просмотреть файл

@ -0,0 +1,896 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
/* global Pipe, ScriptableInputStream, uneval */
const {Constructor: CC, classes: Cc, interfaces: Ci, utils: Cu, results: Cr} = Components;
Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://devtools/shared/event-emitter.js");
Cu.import("chrome://marionette/content/stream-utils.js");
const { Packet, JSONPacket, BulkPacket } =
Cu.import("chrome://marionette/content/packets.js");
const defer = function () {
let deferred = {
promise: new Promise((resolve, reject) => {
deferred.resolve = resolve;
deferred.reject = reject;
})
};
return deferred;
};
const executeSoon = function (func) {
Services.tm.mainThread.dispatch(func, Ci.nsIThread.DISPATCH_NORMAL);
};
const flags = { wantVerbose: false, wantLogging: false };
const dumpv =
flags.wantVerbose ?
function (msg) {dump(msg + "\n");} :
function () {};
const Pipe = CC("@mozilla.org/pipe;1", "nsIPipe", "init");
const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
"nsIScriptableInputStream", "init");
this.EXPORTED_SYMBOLS = ["DebuggerTransport"];
const PACKET_HEADER_MAX = 200;
/**
* An adapter that handles data transfers between the debugger client and
* server. It can work with both nsIPipe and nsIServerSocket transports so
* long as the properly created input and output streams are specified.
* (However, for intra-process connections, LocalDebuggerTransport, below,
* is more efficient than using an nsIPipe pair with DebuggerTransport.)
*
* @param input nsIAsyncInputStream
* The input stream.
* @param output nsIAsyncOutputStream
* The output stream.
*
* Given a DebuggerTransport instance dt:
* 1) Set dt.hooks to a packet handler object (described below).
* 2) Call dt.ready() to begin watching for input packets.
* 3) Call dt.send() / dt.startBulkSend() to send packets.
* 4) Call dt.close() to close the connection, and disengage from the event
* loop.
*
* A packet handler is an object with the following methods:
*
* - onPacket(packet) - called when we have received a complete packet.
* |packet| is the parsed form of the packet --- a JavaScript value, not
* a JSON-syntax string.
*
* - onBulkPacket(packet) - called when we have switched to bulk packet
* receiving mode. |packet| is an object containing:
* * actor: Name of actor that will receive the packet
* * type: Name of actor's method that should be called on receipt
* * length: Size of the data to be read
* * stream: This input stream should only be used directly if you can ensure
* that you will read exactly |length| bytes and will not close the
* stream when reading is complete
* * done: If you use the stream directly (instead of |copyTo| below), you
* must signal completion by resolving / rejecting this deferred.
* If it's rejected, the transport will be closed. If an Error is
* supplied as a rejection value, it will be logged via |dump|.
* If you do use |copyTo|, resolving is taken care of for you when
* copying completes.
* * copyTo: A helper function for getting your data out of the stream that
* meets the stream handling requirements above, and has the
* following signature:
* @param output nsIAsyncOutputStream
* The stream to copy to.
* @return Promise
* The promise is resolved when copying completes or rejected if any
* (unexpected) errors occur.
* This object also emits "progress" events for each chunk that is
* copied. See stream-utils.js.
*
* - onClosed(reason) - called when the connection is closed. |reason| is
* an optional nsresult or object, typically passed when the transport is
* closed due to some error in a underlying stream.
*
* See ./packets.js and the Remote Debugging Protocol specification for more
* details on the format of these packets.
*/
function DebuggerTransport(input, output) {
EventEmitter.decorate(this);
this._input = input;
this._scriptableInput = new ScriptableInputStream(input);
this._output = output;
// The current incoming (possibly partial) header, which will determine which
// type of Packet |_incoming| below will become.
this._incomingHeader = "";
// The current incoming Packet object
this._incoming = null;
// A queue of outgoing Packet objects
this._outgoing = [];
this.hooks = null;
this.active = false;
this._incomingEnabled = true;
this._outgoingEnabled = true;
this.close = this.close.bind(this);
}
DebuggerTransport.prototype = {
/**
* Transmit an object as a JSON packet.
*
* This method returns immediately, without waiting for the entire
* packet to be transmitted, registering event handlers as needed to
* transmit the entire packet. Packets are transmitted in the order
* they are passed to this method.
*/
send: function (object) {
this.emit("send", object);
let packet = new JSONPacket(this);
packet.object = object;
this._outgoing.push(packet);
this._flushOutgoing();
},
/**
* Transmit streaming data via a bulk packet.
*
* This method initiates the bulk send process by queuing up the header data.
* The caller receives eventual access to a stream for writing.
*
* N.B.: Do *not* attempt to close the stream handed to you, as it will
* continue to be used by this transport afterwards. Most users should
* instead use the provided |copyFrom| function instead.
*
* @param header Object
* This is modeled after the format of JSON packets above, but does not
* actually contain the data, but is instead just a routing header:
* * actor: Name of actor that will receive the packet
* * type: Name of actor's method that should be called on receipt
* * length: Size of the data to be sent
* @return Promise
* The promise will be resolved when you are allowed to write to the
* stream with an object containing:
* * stream: This output stream should only be used directly if
* you can ensure that you will write exactly |length|
* bytes and will not close the stream when writing is
* complete
* * done: If you use the stream directly (instead of |copyFrom|
* below), you must signal completion by resolving /
* rejecting this deferred. If it's rejected, the
* transport will be closed. If an Error is supplied as
* a rejection value, it will be logged via |dump|. If
* you do use |copyFrom|, resolving is taken care of for
* you when copying completes.
* * copyFrom: A helper function for getting your data onto the
* stream that meets the stream handling requirements
* above, and has the following signature:
* @param input nsIAsyncInputStream
* The stream to copy from.
* @return Promise
* The promise is resolved when copying completes or
* rejected if any (unexpected) errors occur.
* This object also emits "progress" events for each chunk
* that is copied. See stream-utils.js.
*/
startBulkSend: function (header) {
this.emit("startbulksend", header);
let packet = new BulkPacket(this);
packet.header = header;
this._outgoing.push(packet);
this._flushOutgoing();
return packet.streamReadyForWriting;
},
/**
* Close the transport.
* @param reason nsresult / object (optional)
* The status code or error message that corresponds to the reason for
* closing the transport (likely because a stream closed or failed).
*/
close: function (reason) {
this.emit("close", reason);
this.active = false;
this._input.close();
this._scriptableInput.close();
this._output.close();
this._destroyIncoming();
this._destroyAllOutgoing();
if (this.hooks) {
this.hooks.onClosed(reason);
this.hooks = null;
}
if (reason) {
dumpv("Transport closed: " + reason);
} else {
dumpv("Transport closed.");
}
},
/**
* The currently outgoing packet (at the top of the queue).
*/
get _currentOutgoing() {
return this._outgoing[0];
},
/**
* Flush data to the outgoing stream. Waits until the output stream notifies
* us that it is ready to be written to (via onOutputStreamReady).
*/
_flushOutgoing: function () {
if (!this._outgoingEnabled || this._outgoing.length === 0) {
return;
}
// If the top of the packet queue has nothing more to send, remove it.
if (this._currentOutgoing.done) {
this._finishCurrentOutgoing();
}
if (this._outgoing.length > 0) {
let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
this._output.asyncWait(this, 0, 0, threadManager.currentThread);
}
},
/**
* Pause this transport's attempts to write to the output stream. This is
* used when we've temporarily handed off our output stream for writing bulk
* data.
*/
pauseOutgoing: function () {
this._outgoingEnabled = false;
},
/**
* Resume this transport's attempts to write to the output stream.
*/
resumeOutgoing: function () {
this._outgoingEnabled = true;
this._flushOutgoing();
},
// nsIOutputStreamCallback
/**
* This is called when the output stream is ready for more data to be written.
* The current outgoing packet will attempt to write some amount of data, but
* may not complete.
*/
onOutputStreamReady: function (stream) {
if (!this._outgoingEnabled || this._outgoing.length === 0) {
return;
}
try {
this._currentOutgoing.write(stream);
} catch (e) {
if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
this.close(e.result);
return;
}
throw e;
}
this._flushOutgoing();
},
/**
* Remove the current outgoing packet from the queue upon completion.
*/
_finishCurrentOutgoing: function () {
if (this._currentOutgoing) {
this._currentOutgoing.destroy();
this._outgoing.shift();
}
},
/**
* Clear the entire outgoing queue.
*/
_destroyAllOutgoing: function () {
for (let packet of this._outgoing) {
packet.destroy();
}
this._outgoing = [];
},
/**
* Initialize the input stream for reading. Once this method has been called,
* we watch for packets on the input stream, and pass them to the appropriate
* handlers via this.hooks.
*/
ready: function () {
this.active = true;
this._waitForIncoming();
},
/**
* Asks the input stream to notify us (via onInputStreamReady) when it is
* ready for reading.
*/
_waitForIncoming: function () {
if (this._incomingEnabled) {
let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
this._input.asyncWait(this, 0, 0, threadManager.currentThread);
}
},
/**
* Pause this transport's attempts to read from the input stream. This is
* used when we've temporarily handed off our input stream for reading bulk
* data.
*/
pauseIncoming: function () {
this._incomingEnabled = false;
},
/**
* Resume this transport's attempts to read from the input stream.
*/
resumeIncoming: function () {
this._incomingEnabled = true;
this._flushIncoming();
this._waitForIncoming();
},
// nsIInputStreamCallback
/**
* Called when the stream is either readable or closed.
*/
onInputStreamReady: function (stream) {
try {
while (stream.available() && this._incomingEnabled &&
this._processIncoming(stream, stream.available())) {
// Loop until there is nothing more to process
}
this._waitForIncoming();
} catch (e) {
if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
this.close(e.result);
} else {
throw e;
}
}
},
/**
* Process the incoming data. Will create a new currently incoming Packet if
* needed. Tells the incoming Packet to read as much data as it can, but
* reading may not complete. The Packet signals that its data is ready for
* delivery by calling one of this transport's _on*Ready methods (see
* ./packets.js and the _on*Ready methods below).
* @return boolean
* Whether incoming stream processing should continue for any
* remaining data.
*/
_processIncoming: function (stream, count) {
dumpv("Data available: " + count);
if (!count) {
dumpv("Nothing to read, skipping");
return false;
}
try {
if (!this._incoming) {
dumpv("Creating a new packet from incoming");
if (!this._readHeader(stream)) {
// Not enough data to read packet type
return false;
}
// Attempt to create a new Packet by trying to parse each possible
// header pattern.
this._incoming = Packet.fromHeader(this._incomingHeader, this);
if (!this._incoming) {
throw new Error("No packet types for header: " +
this._incomingHeader);
}
}
if (!this._incoming.done) {
// We have an incomplete packet, keep reading it.
dumpv("Existing packet incomplete, keep reading");
this._incoming.read(stream, this._scriptableInput);
}
} catch (e) {
let msg = "Error reading incoming packet: (" + e + " - " + e.stack + ")";
dump(msg + "\n");
// Now in an invalid state, shut down the transport.
this.close();
return false;
}
if (!this._incoming.done) {
// Still not complete, we'll wait for more data.
dumpv("Packet not done, wait for more");
return true;
}
// Ready for next packet
this._flushIncoming();
return true;
},
/**
* Read as far as we can into the incoming data, attempting to build up a
* complete packet header (which terminates with ":"). We'll only read up to
* PACKET_HEADER_MAX characters.
* @return boolean
* True if we now have a complete header.
*/
_readHeader: function () {
let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
this._incomingHeader +=
StreamUtils.delimitedRead(this._scriptableInput, ":", amountToRead);
if (flags.wantVerbose) {
dumpv("Header read: " + this._incomingHeader);
}
if (this._incomingHeader.endsWith(":")) {
if (flags.wantVerbose) {
dumpv("Found packet header successfully: " + this._incomingHeader);
}
return true;
}
if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
throw new Error("Failed to parse packet header!");
}
// Not enough data yet.
return false;
},
/**
* If the incoming packet is done, log it as needed and clear the buffer.
*/
_flushIncoming: function () {
if (!this._incoming.done) {
return;
}
if (flags.wantLogging) {
dumpv("Got: " + this._incoming);
}
this._destroyIncoming();
},
/**
* Handler triggered by an incoming JSONPacket completing it's |read| method.
* Delivers the packet to this.hooks.onPacket.
*/
_onJSONObjectReady: function (object) {
executeSoon(() => {
// Ensure the transport is still alive by the time this runs.
if (this.active) {
this.emit("packet", object);
this.hooks.onPacket(object);
}
});
},
/**
* Handler triggered by an incoming BulkPacket entering the |read| phase for
* the stream portion of the packet. Delivers info about the incoming
* streaming data to this.hooks.onBulkPacket. See the main comment on the
* transport at the top of this file for more details.
*/
_onBulkReadReady: function (...args) {
executeSoon(() => {
// Ensure the transport is still alive by the time this runs.
if (this.active) {
this.emit("bulkpacket", ...args);
this.hooks.onBulkPacket(...args);
}
});
},
/**
* Remove all handlers and references related to the current incoming packet,
* either because it is now complete or because the transport is closing.
*/
_destroyIncoming: function () {
if (this._incoming) {
this._incoming.destroy();
}
this._incomingHeader = "";
this._incoming = null;
}
};
/**
* An adapter that handles data transfers between the debugger client and
* server when they both run in the same process. It presents the same API as
* DebuggerTransport, but instead of transmitting serialized messages across a
* connection it merely calls the packet dispatcher of the other side.
*
* @param other LocalDebuggerTransport
* The other endpoint for this debugger connection.
*
* @see DebuggerTransport
*/
function LocalDebuggerTransport(other) {
EventEmitter.decorate(this);
this.other = other;
this.hooks = null;
// A packet number, shared between this and this.other. This isn't used by the
// protocol at all, but it makes the packet traces a lot easier to follow.
this._serial = this.other ? this.other._serial : { count: 0 };
this.close = this.close.bind(this);
}
LocalDebuggerTransport.prototype = {
/**
* Transmit a message by directly calling the onPacket handler of the other
* endpoint.
*/
send: function (packet) {
this.emit("send", packet);
let serial = this._serial.count++;
if (flags.wantLogging) {
// Check 'from' first, as 'echo' packets have both.
if (packet.from) {
dumpv("Packet " + serial + " sent from " + uneval(packet.from));
} else if (packet.to) {
dumpv("Packet " + serial + " sent to " + uneval(packet.to));
}
}
this._deepFreeze(packet);
let other = this.other;
if (other) {
executeSoon(() => {
// Avoid the cost of JSON.stringify() when logging is disabled.
if (flags.wantLogging) {
dumpv("Received packet " + serial + ": " + JSON.stringify(packet, null, 2));
}
if (other.hooks) {
other.emit("packet", packet);
other.hooks.onPacket(packet);
}
});
}
},
/**
* Send a streaming bulk packet directly to the onBulkPacket handler of the
* other endpoint.
*
* This case is much simpler than the full DebuggerTransport, since there is
* no primary stream we have to worry about managing while we hand it off to
* others temporarily. Instead, we can just make a single use pipe and be
* done with it.
*/
startBulkSend: function ({actor, type, length}) {
this.emit("startbulksend", {actor, type, length});
let serial = this._serial.count++;
dumpv("Sent bulk packet " + serial + " for actor " + actor);
if (!this.other) {
let error = new Error("startBulkSend: other side of transport missing");
return Promise.reject(error);
}
let pipe = new Pipe(true, true, 0, 0, null);
executeSoon(() => {
dumpv("Received bulk packet " + serial);
if (!this.other.hooks) {
return;
}
// Receiver
let deferred = defer();
let packet = {
actor: actor,
type: type,
length: length,
copyTo: (output) => {
let copying =
StreamUtils.copyStream(pipe.inputStream, output, length);
deferred.resolve(copying);
return copying;
},
stream: pipe.inputStream,
done: deferred
};
this.other.emit("bulkpacket", packet);
this.other.hooks.onBulkPacket(packet);
// Await the result of reading from the stream
deferred.promise.then(() => pipe.inputStream.close(), this.close);
});
// Sender
let sendDeferred = defer();
// The remote transport is not capable of resolving immediately here, so we
// shouldn't be able to either.
executeSoon(() => {
let copyDeferred = defer();
sendDeferred.resolve({
copyFrom: (input) => {
let copying =
StreamUtils.copyStream(input, pipe.outputStream, length);
copyDeferred.resolve(copying);
return copying;
},
stream: pipe.outputStream,
done: copyDeferred
});
// Await the result of writing to the stream
copyDeferred.promise.then(() => pipe.outputStream.close(), this.close);
});
return sendDeferred.promise;
},
/**
* Close the transport.
*/
close: function () {
this.emit("close");
if (this.other) {
// Remove the reference to the other endpoint before calling close(), to
// avoid infinite recursion.
let other = this.other;
this.other = null;
other.close();
}
if (this.hooks) {
try {
this.hooks.onClosed();
} catch (ex) {
console.error(ex);
}
this.hooks = null;
}
},
/**
* An empty method for emulating the DebuggerTransport API.
*/
ready: function () {},
/**
* Helper function that makes an object fully immutable.
*/
_deepFreeze: function (object) {
Object.freeze(object);
for (let prop in object) {
// Freeze the properties that are objects, not on the prototype, and not
// already frozen. Note that this might leave an unfrozen reference
// somewhere in the object if there is an already frozen object containing
// an unfrozen object.
if (object.hasOwnProperty(prop) && typeof object === "object" &&
!Object.isFrozen(object)) {
this._deepFreeze(object[prop]);
}
}
}
};
/**
* A transport for the debugging protocol that uses nsIMessageManagers to
* exchange packets with servers running in child processes.
*
* In the parent process, |mm| should be the nsIMessageSender for the
* child process. In a child process, |mm| should be the child process
* message manager, which sends packets to the parent.
*
* |prefix| is a string included in the message names, to distinguish
* multiple servers running in the same child process.
*
* This transport exchanges messages named 'debug:<prefix>:packet', where
* <prefix> is |prefix|, whose data is the protocol packet.
*/
function ChildDebuggerTransport(mm, prefix) {
EventEmitter.decorate(this);
this._mm = mm;
this._messageName = "debug:" + prefix + ":packet";
}
/*
* To avoid confusion, we use 'message' to mean something that
* nsIMessageSender conveys, and 'packet' to mean a remote debugging
* protocol packet.
*/
ChildDebuggerTransport.prototype = {
constructor: ChildDebuggerTransport,
hooks: null,
_addListener() {
this._mm.addMessageListener(this._messageName, this);
},
_removeListener() {
try {
this._mm.removeMessageListener(this._messageName, this);
} catch (e) {
if (e.result != Cr.NS_ERROR_NULL_POINTER) {
throw e;
}
// In some cases, especially when using messageManagers in non-e10s mode, we reach
// this point with a dead messageManager which only throws errors but does not
// seem to indicate in any other way that it is dead.
}
},
ready: function () {
this._addListener();
},
close: function () {
this._removeListener();
this.emit("close");
this.hooks.onClosed();
},
receiveMessage: function ({data}) {
this.emit("packet", data);
this.hooks.onPacket(data);
},
send: function (packet) {
this.emit("send", packet);
try {
this._mm.sendAsyncMessage(this._messageName, packet);
} catch (e) {
if (e.result != Cr.NS_ERROR_NULL_POINTER) {
throw e;
}
// In some cases, especially when using messageManagers in non-e10s mode, we reach
// this point with a dead messageManager which only throws errors but does not
// seem to indicate in any other way that it is dead.
}
},
startBulkSend: function () {
throw new Error("Can't send bulk data to child processes.");
},
swapBrowser(mm) {
this._removeListener();
this._mm = mm;
this._addListener();
},
};
// WorkerDebuggerTransport is defined differently depending on whether we are
// on the main thread or a worker thread. In the former case, we are required
// by the devtools loader, and isWorker will be false. Otherwise, we are
// required by the worker loader, and isWorker will be true.
//
// Each worker debugger supports only a single connection to the main thread.
// However, its theoretically possible for multiple servers to connect to the
// same worker. Consequently, each transport has a connection id, to allow
// messages from multiple connections to be multiplexed on a single channel.
if (!this.isWorker) {
// Main thread
(function () {
/**
* A transport that uses a WorkerDebugger to send packets from the main
* thread to a worker thread.
*/
function WorkerDebuggerTransport(dbg, id) {
this._dbg = dbg;
this._id = id;
this.onMessage = this._onMessage.bind(this);
}
WorkerDebuggerTransport.prototype = {
constructor: WorkerDebuggerTransport,
ready: function () {
this._dbg.addListener(this);
},
close: function () {
this._dbg.removeListener(this);
if (this.hooks) {
this.hooks.onClosed();
}
},
send: function (packet) {
this._dbg.postMessage(JSON.stringify({
type: "message",
id: this._id,
message: packet
}));
},
startBulkSend: function () {
throw new Error("Can't send bulk data from worker threads!");
},
_onMessage: function (message) {
let packet = JSON.parse(message);
if (packet.type !== "message" || packet.id !== this._id) {
return;
}
if (this.hooks) {
this.hooks.onPacket(packet.message);
}
}
};
}).call(this);
} else {
// Worker thread
(function () {
/**
* A transport that uses a WorkerDebuggerGlobalScope to send packets from a
* worker thread to the main thread.
*/
function WorkerDebuggerTransport(scope, id) {
this._scope = scope;
this._id = id;
this._onMessage = this._onMessage.bind(this);
}
WorkerDebuggerTransport.prototype = {
constructor: WorkerDebuggerTransport,
ready: function () {
this._scope.addEventListener("message", this._onMessage);
},
close: function () {
this._scope.removeEventListener("message", this._onMessage);
if (this.hooks) {
this.hooks.onClosed();
}
},
send: function (packet) {
this._scope.postMessage(JSON.stringify({
type: "message",
id: this._id,
message: packet
}));
},
startBulkSend: function () {
throw new Error("Can't send bulk data from worker threads!");
},
_onMessage: function (event) {
let packet = JSON.parse(event.data);
if (packet.type !== "message" || packet.id !== this._id) {
return;
}
if (this.hooks) {
this.hooks.onPacket(packet.message);
}
}
};
}).call(this);
}