зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1563685
- Rename WebSocket.jsm and drop unused accept method. r=remote-protocol-reviewers,jdescottes,ato
Differential Revision: https://phabricator.services.mozilla.com/D37039 --HG-- rename : remote/server/WebSocket.jsm => remote/server/WebSocketHandshake.jsm extra : moz-landing-system : lando
This commit is contained in:
Родитель
b6ef132da1
Коммит
6713234855
|
@ -53,8 +53,7 @@ remote.jar:
|
|||
|
||||
# transport layer
|
||||
content/server/HTTPD.jsm (../netwerk/test/httpserver/httpd.js)
|
||||
content/server/Stream.jsm (server/Stream.jsm)
|
||||
content/server/WebSocket.jsm (server/WebSocket.jsm)
|
||||
content/server/WebSocketHandshake.jsm (server/WebSocketHandshake.jsm)
|
||||
content/server/WebSocketTransport.jsm (server/WebSocketTransport.jsm)
|
||||
|
||||
# imports from external folders
|
||||
|
|
|
@ -1,249 +0,0 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
"use strict";
|
||||
|
||||
var EXPORTED_SYMBOLS = ["Stream"];
|
||||
|
||||
// This is an XPCOM service-ified copy of ../devtools/shared/transport/stream-utils.js.
|
||||
|
||||
const CC = Components.Constructor;
|
||||
|
||||
const {EventEmitter} = ChromeUtils.import("resource://gre/modules/EventEmitter.jsm");
|
||||
const {Services} = ChromeUtils.import("resource://gre/modules/Services.jsm");
|
||||
|
||||
const IOUtil = Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
|
||||
const ScriptableInputStream = CC("@mozilla.org/scriptableinputstream;1",
|
||||
"nsIScriptableInputStream", "init");
|
||||
|
||||
const BUFFER_SIZE = 0x8000;
|
||||
|
||||
/**
|
||||
* This helper function (and its companion object) are used by bulk
|
||||
* senders and receivers to read and write data in and out of other streams.
|
||||
* Functions that make use of this tool are passed to callers when it is
|
||||
* time to read or write bulk data. It is highly recommended to use these
|
||||
* copier functions instead of the stream directly because the copier
|
||||
* enforces the agreed upon length. Since bulk mode reuses an existing
|
||||
* stream, the sender and receiver must write and read exactly the agreed
|
||||
* upon amount of data, or else the entire transport will be left in a
|
||||
* invalid state. Additionally, other methods of stream copying (such as
|
||||
* NetUtil.asyncCopy) close the streams involved, which would terminate
|
||||
* the debugging transport, and so it is avoided here.
|
||||
*
|
||||
* Overall, this *works*, but clearly the optimal solution would be
|
||||
* able to just use the streams directly. If it were possible to fully
|
||||
* implement nsIInputStream/nsIOutputStream in JS, wrapper streams could
|
||||
* be created to enforce the length and avoid closing, and consumers could
|
||||
* use familiar stream utilities like NetUtil.asyncCopy.
|
||||
*
|
||||
* The function takes two async streams and copies a precise number
|
||||
* of bytes from one to the other. Copying begins immediately, but may
|
||||
* complete at some future time depending on data size. Use the returned
|
||||
* promise to know when it's complete.
|
||||
*
|
||||
* @param {nsIAsyncInputStream} input
|
||||
* Stream to copy from.
|
||||
* @param {nsIAsyncOutputStream} output
|
||||
* Stream to copy to.
|
||||
* @param {number} length
|
||||
* Amount of data that needs to be copied.
|
||||
*
|
||||
* @return {Promise}
|
||||
* Promise is resolved when copying completes or rejected if any
|
||||
* (unexpected) errors occur.
|
||||
*/
|
||||
function copyStream(input, output, length) {
|
||||
let copier = new StreamCopier(input, output, length);
|
||||
return copier.copy();
|
||||
}
|
||||
|
||||
/** @class */
|
||||
function StreamCopier(input, output, length) {
|
||||
EventEmitter.decorate(this);
|
||||
this._id = StreamCopier._nextId++;
|
||||
this.input = input;
|
||||
// Save off the base output stream, since we know it's async as we've
|
||||
// required
|
||||
this.baseAsyncOutput = output;
|
||||
if (IOUtil.outputStreamIsBuffered(output)) {
|
||||
this.output = output;
|
||||
} else {
|
||||
this.output = Cc["@mozilla.org/network/buffered-output-stream;1"]
|
||||
.createInstance(Ci.nsIBufferedOutputStream);
|
||||
this.output.init(output, BUFFER_SIZE);
|
||||
}
|
||||
this._length = length;
|
||||
this._amountLeft = length;
|
||||
this._deferred = {
|
||||
promise: new Promise((resolve, reject) => {
|
||||
this._deferred.resolve = resolve;
|
||||
this._deferred.reject = reject;
|
||||
}),
|
||||
};
|
||||
|
||||
this._copy = this._copy.bind(this);
|
||||
this._flush = this._flush.bind(this);
|
||||
this._destroy = this._destroy.bind(this);
|
||||
|
||||
// Copy promise's then method up to this object.
|
||||
//
|
||||
// Allows the copier to offer a promise interface for the simple succeed
|
||||
// or fail scenarios, but also emit events (due to the EventEmitter)
|
||||
// for other states, like progress.
|
||||
this.then = this._deferred.promise.then.bind(this._deferred.promise);
|
||||
this.then(this._destroy, this._destroy);
|
||||
|
||||
// Stream ready callback starts as |_copy|, but may switch to |_flush|
|
||||
// at end if flushing would block the output stream.
|
||||
this._streamReadyCallback = this._copy;
|
||||
}
|
||||
StreamCopier._nextId = 0;
|
||||
|
||||
StreamCopier.prototype = {
|
||||
|
||||
copy() {
|
||||
// Dispatch to the next tick so that it's possible to attach a progress
|
||||
// event listener, even for extremely fast copies (like when testing).
|
||||
Services.tm.currentThread.dispatch(() => {
|
||||
try {
|
||||
this._copy();
|
||||
} catch (e) {
|
||||
this._deferred.reject(e);
|
||||
}
|
||||
}, 0);
|
||||
return this;
|
||||
},
|
||||
|
||||
_copy() {
|
||||
let bytesAvailable = this.input.available();
|
||||
let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
|
||||
this._debug("Trying to copy: " + amountToCopy);
|
||||
|
||||
let bytesCopied;
|
||||
try {
|
||||
bytesCopied = this.output.writeFrom(this.input, amountToCopy);
|
||||
} catch (e) {
|
||||
if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
|
||||
this._debug("Base stream would block, will retry");
|
||||
this._debug("Waiting for output stream");
|
||||
this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
|
||||
this._amountLeft -= bytesCopied;
|
||||
this._debug("Copied: " + bytesCopied +
|
||||
", Left: " + this._amountLeft);
|
||||
this._emitProgress();
|
||||
|
||||
if (this._amountLeft === 0) {
|
||||
this._debug("Copy done!");
|
||||
this._flush();
|
||||
return;
|
||||
}
|
||||
|
||||
this._debug("Waiting for input stream");
|
||||
this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
|
||||
},
|
||||
|
||||
_emitProgress() {
|
||||
this.emit("progress", {
|
||||
bytesSent: this._length - this._amountLeft,
|
||||
totalBytes: this._length,
|
||||
});
|
||||
},
|
||||
|
||||
_flush() {
|
||||
try {
|
||||
this.output.flush();
|
||||
} catch (e) {
|
||||
if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
|
||||
e.result == Cr.NS_ERROR_FAILURE) {
|
||||
this._debug("Flush would block, will retry");
|
||||
this._streamReadyCallback = this._flush;
|
||||
this._debug("Waiting for output stream");
|
||||
this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
this._deferred.resolve();
|
||||
},
|
||||
|
||||
_destroy() {
|
||||
this._destroy = null;
|
||||
this._copy = null;
|
||||
this._flush = null;
|
||||
this.input = null;
|
||||
this.output = null;
|
||||
},
|
||||
|
||||
// nsIInputStreamCallback
|
||||
onInputStreamReady() {
|
||||
this._streamReadyCallback();
|
||||
},
|
||||
|
||||
// nsIOutputStreamCallback
|
||||
onOutputStreamReady() {
|
||||
this._streamReadyCallback();
|
||||
},
|
||||
|
||||
_debug() {
|
||||
},
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* Read from a stream, one byte at a time, up to the next
|
||||
* <var>delimiter</var> character, but stopping if we've read |count|
|
||||
* without finding it. Reading also terminates early if there are less
|
||||
* than <var>count</var> bytes available on the stream. In that case,
|
||||
* we only read as many bytes as the stream currently has to offer.
|
||||
*
|
||||
* @param {nsIInputStream} stream
|
||||
* Input stream to read from.
|
||||
* @param {string} delimiter
|
||||
* Character we're trying to find.
|
||||
* @param {number} count
|
||||
* Max number of characters to read while searching.
|
||||
*
|
||||
* @return {string}
|
||||
* Collected data. If the delimiter was found, this string will
|
||||
* end with it.
|
||||
*/
|
||||
// TODO: This implementation could be removed if bug 984651 is fixed,
|
||||
// which provides a native version of the same idea.
|
||||
function delimitedRead(stream, delimiter, count) {
|
||||
let scriptableStream;
|
||||
if (stream instanceof Ci.nsIScriptableInputStream) {
|
||||
scriptableStream = stream;
|
||||
} else {
|
||||
scriptableStream = new ScriptableInputStream(stream);
|
||||
}
|
||||
|
||||
let data = "";
|
||||
|
||||
// Don't exceed what's available on the stream
|
||||
count = Math.min(count, stream.available());
|
||||
|
||||
if (count <= 0) {
|
||||
return data;
|
||||
}
|
||||
|
||||
let char;
|
||||
while (char !== delimiter && count > 0) {
|
||||
char = scriptableStream.readBytes(1);
|
||||
count--;
|
||||
data += char;
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
|
||||
this.Stream = {
|
||||
copyStream,
|
||||
delimitedRead,
|
||||
};
|
|
@ -4,16 +4,13 @@
|
|||
|
||||
"use strict";
|
||||
|
||||
var EXPORTED_SYMBOLS = ["WebSocketServer"];
|
||||
var EXPORTED_SYMBOLS = ["WebSocketHandshake"];
|
||||
|
||||
// This file is an XPCOM service-ified copy of ../devtools/server/socket/websocket-server.js.
|
||||
|
||||
const CC = Components.Constructor;
|
||||
|
||||
const { Services } = ChromeUtils.import("resource://gre/modules/Services.jsm");
|
||||
const { Stream } = ChromeUtils.import(
|
||||
"chrome://remote/content/server/Stream.jsm"
|
||||
);
|
||||
const { XPCOMUtils } = ChromeUtils.import(
|
||||
"resource://gre/modules/XPCOMUtils.jsm"
|
||||
);
|
||||
|
@ -29,53 +26,9 @@ const CryptoHash = CC(
|
|||
);
|
||||
const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
|
||||
|
||||
// limit the header size to put an upper bound on allocated memory
|
||||
const HEADER_MAX_LEN = 8000;
|
||||
|
||||
// TODO(ato): Merge this with httpd.js so that we can respond to both HTTP/1.1
|
||||
// as well as WebSocket requests on the same server.
|
||||
|
||||
/**
|
||||
* Read a line from async input stream
|
||||
* and return promise that resolves to the line once it has been read.
|
||||
* If the line is longer than HEADER_MAX_LEN, will throw error.
|
||||
*/
|
||||
function readLine(input) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let line = "";
|
||||
const wait = () => {
|
||||
input.asyncWait(
|
||||
stream => {
|
||||
try {
|
||||
const amountToRead = HEADER_MAX_LEN - line.length;
|
||||
line += Stream.delimitedRead(input, "\n", amountToRead);
|
||||
|
||||
if (line.endsWith("\n")) {
|
||||
resolve(line.trimRight());
|
||||
return;
|
||||
}
|
||||
|
||||
if (line.length >= HEADER_MAX_LEN) {
|
||||
throw new Error(
|
||||
`Failed to read HTTP header longer than ${HEADER_MAX_LEN} bytes`
|
||||
);
|
||||
}
|
||||
|
||||
wait();
|
||||
} catch (ex) {
|
||||
reject(ex);
|
||||
}
|
||||
},
|
||||
0,
|
||||
0,
|
||||
threadManager.currentThread
|
||||
);
|
||||
};
|
||||
|
||||
wait();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a string of bytes to async output stream
|
||||
* and return promise that resolves once all data has been written.
|
||||
|
@ -110,38 +63,6 @@ function writeString(output, data) {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Read HTTP request from async input stream.
|
||||
*
|
||||
* @return Request line (string) and Map of header names and values.
|
||||
*/
|
||||
const readHttpRequest = async function(input) {
|
||||
let requestLine = "";
|
||||
const headers = new Map();
|
||||
|
||||
while (true) {
|
||||
const line = await readLine(input);
|
||||
if (line.length == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!requestLine) {
|
||||
requestLine = line;
|
||||
} else {
|
||||
const colon = line.indexOf(":");
|
||||
if (colon == -1) {
|
||||
throw new Error(`Malformed HTTP header: ${line}`);
|
||||
}
|
||||
|
||||
const name = line.slice(0, colon).toLowerCase();
|
||||
const value = line.slice(colon + 1).trim();
|
||||
headers.set(name, value);
|
||||
}
|
||||
}
|
||||
|
||||
return { requestLine, headers };
|
||||
};
|
||||
|
||||
/** Write HTTP response (array of strings) to async output stream. */
|
||||
function writeHttpResponse(output, response) {
|
||||
const s = response.join("\r\n") + "\r\n\r\n";
|
||||
|
@ -250,18 +171,6 @@ async function createWebSocket(transport, input, output) {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Accept an incoming WebSocket server connection.
|
||||
* Takes an established nsISocketTransport in the parameters.
|
||||
* Performs the WebSocket handshake and waits for the WebSocket to open.
|
||||
* Returns Promise with a WebSocket ready to send and receive messages.
|
||||
*/
|
||||
async function accept(transport, input, output) {
|
||||
const request = await readHttpRequest(input);
|
||||
await serverHandshake(request, output);
|
||||
return createWebSocket(transport, input, output);
|
||||
}
|
||||
|
||||
/** Upgrade an existing HTTP request from httpd.js to WebSocket. */
|
||||
async function upgrade(request, response) {
|
||||
// handle response manually, allowing us to send arbitrary data
|
||||
|
@ -282,4 +191,4 @@ async function upgrade(request, response) {
|
|||
return createWebSocket(transport, input, output);
|
||||
}
|
||||
|
||||
const WebSocketServer = { accept, upgrade };
|
||||
const WebSocketHandshake = { upgrade };
|
|
@ -12,8 +12,8 @@ const { Connection } = ChromeUtils.import(
|
|||
const { WebSocketDebuggerTransport } = ChromeUtils.import(
|
||||
"chrome://remote/content/server/WebSocketTransport.jsm"
|
||||
);
|
||||
const { WebSocketServer } = ChromeUtils.import(
|
||||
"chrome://remote/content/server/WebSocket.jsm"
|
||||
const { WebSocketHandshake } = ChromeUtils.import(
|
||||
"chrome://remote/content/server/WebSocketHandshake.jsm"
|
||||
);
|
||||
|
||||
/**
|
||||
|
@ -42,7 +42,7 @@ class Target {
|
|||
// nsIHttpRequestHandler
|
||||
|
||||
async handle(request, response) {
|
||||
const so = await WebSocketServer.upgrade(request, response);
|
||||
const so = await WebSocketHandshake.upgrade(request, response);
|
||||
const transport = new WebSocketDebuggerTransport(so);
|
||||
const conn = new Connection(transport, response._connection);
|
||||
this.sessions.set(conn, new this.sessionClass(conn, this));
|
||||
|
|
Загрузка…
Ссылка в новой задаче