Bug 1106675 - Replace _queueOrRun with a promise-chain. r=mt

This commit is contained in:
Jan-Ivar Bruaroey 2014-12-04 11:12:19 -08:00
Родитель 7bdce1f5fc
Коммит 4f6ab2df19
1 изменённых файлов: 82 добавлений и 165 удалений

Просмотреть файл

@ -289,7 +289,6 @@ RTCIdentityAssertion.prototype = {
};
function RTCPeerConnection() {
this._queue = [];
this._senders = [];
this._receivers = [];
@ -312,16 +311,6 @@ function RTCPeerConnection() {
this._remoteType = null;
this._peerIdentity = null;
/**
* Everytime we get a request from content, we put it in the queue. If there
* are no pending operations though, we will execute it immediately. In
* PeerConnectionObserver, whenever we are notified that an operation has
* finished, we will check the queue for the next operation and execute if
* neccesary. The _pending flag indicates whether an operation is currently in
* progress.
*/
this._pending = false;
// States
this._iceGatheringState = this._iceConnectionState = "new";
}
@ -362,6 +351,7 @@ RTCPeerConnection.prototype = {
this.makeGetterSetterEH("onidpvalidationerror");
this._pc = new this._win.PeerConnectionImpl();
this._taskChain = this._win.Promise.resolve();
this.__DOM_IMPL__._innerObject = this;
this._observer = new this._win.PeerConnectionObserver(this.__DOM_IMPL__);
@ -369,14 +359,6 @@ RTCPeerConnection.prototype = {
// Add a reference to the PeerConnection to global list (before init).
_globalPCList.addPC(this);
this._queueOrRun({
func: this._initialize,
args: [rtcConfig],
wait: false
});
},
_initialize: function(rtcConfig) {
this._impl.initialize(this._observer, this._win, rtcConfig,
Services.tm.currentThread);
this._initIdp();
@ -402,35 +384,28 @@ RTCPeerConnection.prototype = {
},
/**
* Add a function to the queue or run it immediately if the queue is empty.
* Argument is an object with the func, args and wait properties; wait should
* be set to true if the function has a success/error callback that will call
* _executeNext, false if it doesn't have a callback.
* Add a function to the task chain.
* onSuccess - legacy callback (optional)
* onError - legacy callback (optional)
*/
_queueOrRun: function(obj) {
this._checkClosed();
if (this._pending) {
// We are waiting for a callback before doing any more work.
this._queue.push(obj);
} else {
this._pending = obj.wait;
obj.func.apply(this, obj.args);
}
_queue: function(func, onSuccess, onError) {
let p = this._taskChain.then(() => {
this._checkClosed(); // TODO: Move outside promise once Bug 1107592 is fixed.
return func();
});
this._taskChain = p.catch(() => {}); // don't propagate errors in taskChain!
return onSuccess? p.then(this._wrapLegacyCallback(onSuccess),
this._wrapLegacyCallback(onError)) : p;
},
// Pick the next item from the queue and run it.
_executeNext: function() {
// Maybe _pending should be a string, and we should
// take a string arg and make sure they match to detect errors?
this._pending = false;
while (this._queue.length && !this._pending) {
let obj = this._queue.shift();
// Doesn't re-queue since _pending is false
this._queueOrRun(obj);
}
_wrapLegacyCallback: function(func) {
return result => {
try {
func && func(result);
} catch (e) {
this.logErrorAndCallOnError(e);
}
};
},
/**
@ -553,21 +528,6 @@ RTCPeerConnection.prototype = {
});
},
// Helper for legacy callbacks
thenCB: function(p, onSuccess, onError) {
var errorFunc = this.logErrorAndCallOnError.bind(this);
function callCB(func, arg) {
try {
func(arg);
} catch (e) {
errorFunc(e);
}
}
return onSuccess? p.then(result => callCB(onSuccess, result),
reason => (onError? callCB(onError, reason) : null)) : p;
},
createOffer: function(optionsOrOnSuccess, onError, options) {
// TODO: Remove old constraint-like RTCOptions support soon (Bug 1064223).
@ -619,47 +579,35 @@ RTCPeerConnection.prototype = {
JSON.stringify(options) + " instead (note the case difference)!",
null, 0);
}
let p = new this._win.Promise((resolve, reject) => this._queueOrRun({
func: this._createOffer,
args: [resolve, reject, options],
wait: true
}));
return this.thenCB(p, onSuccess, onError);
return this._queue(() => this._createOffer(options), onSuccess, onError);
},
_createOffer: function(onSuccess, onError, options) {
this._onCreateOfferSuccess = onSuccess;
this._onCreateOfferFailure = onError;
this._impl.createOffer(options);
_createOffer: function(options) {
return new this._win.Promise((resolve, reject) => {
this._onCreateOfferSuccess = resolve;
this._onCreateOfferFailure = reject;
this._impl.createOffer(options);
});
},
_createAnswer: function(onSuccess, onError) {
this._onCreateAnswerSuccess = onSuccess;
this._onCreateAnswerFailure = onError;
if (!this.remoteDescription) {
this._observer.onCreateAnswerError(Ci.IPeerConnection.kInvalidState,
"setRemoteDescription not called");
return;
}
if (this.remoteDescription.type != "offer") {
this._observer.onCreateAnswerError(Ci.IPeerConnection.kInvalidState,
"No outstanding offer");
return;
}
this._impl.createAnswer();
_createAnswer: function() {
return new this._win.Promise((resolve, reject) => {
if (!this.remoteDescription) {
throw new this._win.DOMError("InvalidStateError",
"setRemoteDescription not called");
}
if (this.remoteDescription.type != "offer") {
throw new this._win.DOMError("InvalidStateError",
"No outstanding offer");
}
this._onCreateAnswerSuccess = resolve;
this._onCreateAnswerFailure = reject;
this._impl.createAnswer();
});
},
createAnswer: function(onSuccess, onError) {
let p = new this._win.Promise((resolve, reject) => this._queueOrRun({
func: this._createAnswer,
args: [resolve, reject],
wait: true
}));
return this.thenCB(p, onSuccess, onError);
return this._queue(() => this._createAnswer(), onSuccess, onError);
},
setLocalDescription: function(desc, onSuccess, onError) {
@ -680,18 +628,16 @@ RTCPeerConnection.prototype = {
"Invalid type " + desc.type + " provided to setLocalDescription");
}
let p = new this._win.Promise((resolve, reject) => this._queueOrRun({
func: this._setLocalDescription,
args: [type, desc.sdp, resolve, reject],
wait: true
}));
return this.thenCB(p, onSuccess, onError);
return this._queue(() => this._setLocalDescription(type, desc.sdp),
onSuccess, onError);
},
_setLocalDescription: function(type, sdp, onSuccess, onError) {
this._onSetLocalDescriptionSuccess = onSuccess;
this._onSetLocalDescriptionFailure = onError;
this._impl.setLocalDescription(type, sdp);
_setLocalDescription: function(type, sdp) {
return new this._win.Promise((resolve, reject) => {
this._onSetLocalDescriptionSuccess = resolve;
this._onSetLocalDescriptionFailure = reject;
this._impl.setLocalDescription(type, sdp);
});
},
setRemoteDescription: function(desc, onSuccess, onError) {
@ -715,12 +661,8 @@ RTCPeerConnection.prototype = {
// Have to get caller's origin outside of Promise constructor and pass it in
let origin = Cu.getWebIDLCallerPrincipal().origin;
let p = new this._win.Promise((resolve, reject) => this._queueOrRun({
func: this._setRemoteDescription,
args: [type, desc.sdp, origin, resolve, reject],
wait: true
}));
return this.thenCB(p, onSuccess, onError);
return this._queue(() => this._setRemoteDescription(type, desc.sdp, origin),
onSuccess, onError);
},
/**
@ -745,7 +687,12 @@ RTCPeerConnection.prototype = {
return good;
},
_setRemoteDescription: function(type, sdp, origin, onSuccess, onError) {
_setRemoteDescription: function(type, sdp, origin) {
return new this._win.Promise((resolve, reject) =>
this._setRemoteDescriptionImpl(type, sdp, origin, resolve, reject));
},
_setRemoteDescriptionImpl: function(type, sdp, origin, onSuccess, onError) {
let idpComplete = false;
let setRemoteComplete = false;
let idpError = null;
@ -762,7 +709,6 @@ RTCPeerConnection.prototype = {
// Violation of spec, but we allow it for now
onSuccess();
isDone = true;
this._executeNext();
};
let setRemoteDone = () => {
@ -839,22 +785,17 @@ RTCPeerConnection.prototype = {
throw new this._win.DOMError("InvalidParameterError",
"Invalid candidate passed to addIceCandidate!");
}
let p = new this._win.Promise((resolve, reject) => this._queueOrRun({
func: this._addIceCandidate,
args: [cand, resolve, reject],
wait: false
}));
return this.thenCB(p, onSuccess, onError);
return this._queue(() => this._addIceCandidate(cand), onSuccess, onError);
},
_addIceCandidate: function(cand, onSuccess, onError) {
this._onAddIceCandidateSuccess = onSuccess || null;
this._onAddIceCandidateError = onError || null;
_addIceCandidate: function(cand) {
return new this._win.Promise((resolve, reject) => {
this._onAddIceCandidateSuccess = resolve;
this._onAddIceCandidateError = reject;
this._impl.addIceCandidate(cand.candidate, cand.sdpMid || "",
(cand.sdpMLineIndex === null) ? 0 :
this._impl.addIceCandidate(cand.candidate, cand.sdpMid || "",
cand.sdpMLineIndex);
});
},
addStream: function(stream) {
@ -891,7 +832,7 @@ RTCPeerConnection.prototype = {
throw new this._win.DOMError("NotSupportedError", "removeTrack not yet implemented");
},
_replaceTrack: function(sender, withTrack, onSuccess, onError) {
_replaceTrack: function(sender, withTrack) {
// TODO: Do a (sender._stream.getTracks().indexOf(track) == -1) check
// on both track args someday.
//
@ -903,11 +844,13 @@ RTCPeerConnection.prototype = {
// Since a track may be replaced more than once, the track being replaced
// may not be in the stream either, so we check neither arg right now.
this._onReplaceTrackSender = sender;
this._onReplaceTrackWithTrack = withTrack;
this._onReplaceTrackSuccess = onSuccess;
this._onReplaceTrackFailure = onError;
this._impl.replaceTrack(sender.track, withTrack, sender._stream);
return new this._win.Promise((resolve, reject) => {
this._onReplaceTrackSender = sender;
this._onReplaceTrackWithTrack = withTrack;
this._onReplaceTrackSuccess = resolve;
this._onReplaceTrackFailure = reject;
this._impl.replaceTrack(sender.track, withTrack, sender._stream);
});
},
close: function() {
@ -915,14 +858,10 @@ RTCPeerConnection.prototype = {
return;
}
this.changeIceConnectionState("closed");
this._queueOrRun({ func: this._close, args: [false], wait: false });
this._closed = true;
},
_close: function() {
this._localIdp.close();
this._remoteIdp.close();
this._impl.close();
this._closed = true;
},
getLocalStreams: function() {
@ -1022,19 +961,15 @@ RTCPeerConnection.prototype = {
},
getStats: function(selector, onSuccess, onError) {
let p = new this._win.Promise((resolve, reject) => this._queueOrRun({
func: this._getStats,
args: [selector, resolve, reject],
wait: false
}));
return this.thenCB(p, onSuccess, onError);
return this._queue(() => this._getStats(selector), onSuccess, onError);
},
_getStats: function(selector, onSuccess, onError) {
this._onGetStatsSuccess = onSuccess;
this._onGetStatsFailure = onError;
this._impl.getStats(selector);
_getStats: function(selector) {
return new this._win.Promise((resolve, reject) => {
this._onGetStatsSuccess = resolve;
this._onGetStatsFailure = reject;
this._impl.getStats(selector);
});
},
createDataChannel: function(label, dict) {
@ -1140,13 +1075,11 @@ PeerConnectionObserver.prototype = {
}
pc._onCreateOfferSuccess(new pc._win.mozRTCSessionDescription({ type: "offer",
sdp: sdp }));
pc._executeNext();
}.bind(this));
},
onCreateOfferError: function(code, message) {
this._dompc._onCreateOfferFailure(this.newError(code, message));
this._dompc._executeNext();
},
onCreateAnswerSuccess: function(sdp) {
@ -1159,45 +1092,37 @@ PeerConnectionObserver.prototype = {
}
pc._onCreateAnswerSuccess(new pc._win.mozRTCSessionDescription({ type: "answer",
sdp: sdp }));
pc._executeNext();
}.bind(this));
},
onCreateAnswerError: function(code, message) {
this._dompc._onCreateAnswerFailure(this.newError(code, message));
this._dompc._executeNext();
},
onSetLocalDescriptionSuccess: function() {
this._dompc._onSetLocalDescriptionSuccess();
this._dompc._executeNext();
},
onSetRemoteDescriptionSuccess: function() {
// This function calls _executeNext() for us
this._dompc._onSetRemoteDescriptionSuccess();
},
onSetLocalDescriptionError: function(code, message) {
this._localType = null;
this._dompc._onSetLocalDescriptionFailure(this.newError(code, message));
this._dompc._executeNext();
},
onSetRemoteDescriptionError: function(code, message) {
this._remoteType = null;
this._dompc._onSetRemoteDescriptionFailure(this.newError(code, message));
this._dompc._executeNext();
},
onAddIceCandidateSuccess: function() {
this._dompc._onAddIceCandidateSuccess();
this._dompc._executeNext();
},
onAddIceCandidateError: function(code, message) {
this._dompc._onAddIceCandidateError(this.newError(code, message));
this._dompc._executeNext();
},
onIceCandidate: function(level, mid, candidate) {
@ -1322,12 +1247,10 @@ PeerConnectionObserver.prototype = {
chromeobj);
chromeobj.makeStatsPublic();
this._dompc._onGetStatsSuccess(webidlobj);
this._dompc._executeNext();
},
onGetStatsError: function(code, message) {
this._dompc._onGetStatsFailure(this.newError(code, message));
this._dompc._executeNext();
},
onAddStream: function(stream) {
@ -1410,13 +1333,7 @@ RTCRtpSender.prototype = {
QueryInterface: XPCOMUtils.generateQI([Ci.nsISupports]),
replaceTrack: function(withTrack) {
this._pc._checkClosed();
return new this._pc._win.Promise((resolve, reject) => this._pc._queueOrRun({
func: this._pc._replaceTrack,
args: [this, withTrack, resolve, reject],
wait: false
}));
return this._pc._queue(() => this._pc._replaceTrack(this, withTrack));
}
};