initial linear implementation of Stream and Node

This commit is contained in:
Olivier Yiptong 2014-05-20 11:50:27 -04:00
Родитель 2b24e28628
Коммит a2dbb7a0ad
2 изменённых файлов: 270 добавлений и 0 удалений

180
lib/streams/core.js Normal file
Просмотреть файл

@ -0,0 +1,180 @@
"use strict";
const {Cu} = require("chrome");
Cu.import("resource://gre/modules/Task.jsm");
Cu.import("resource://gre/modules/Promise.jsm");
//const Promise = require('sdk/core/promise');
function NotImplementedError(message) {
this.message = message;
this.name = "NotImplementedError";
this.toString = function() {
return this.name + " " + this.message;
};
}
function Node(identifier, listenType, emitType) {
this.identifier = identifier;
this.listenType = listens;
this.emitType = emits;
this.emitDeferred = Promise.defer();
this.results = null;
}
Node.prototype = {
/*
* A Node consumes data and returns a promise when it is done processing.
*
* Bolts and Spouts are implemented using the Node object.
*
* A Bolt is defined as a Node that returns a result as soon as a message is
* ingested.
*
* A Spout is defined as a Node that returns a result eventually, when
* a certain condition is met and emitReady is true.
*
* When implementing a Spout, create a condition by overriding emitReady.
* When implementing a Bolt, leave emitReady as-is.
*/
consume: function _Node_consume(message) {
/*
* Process a message
* @returns a promise that resolves when processing is complete
*/
this.ingest(message);
if (this.emitReady()) {
return this.flush();
}
return this.emitDeferred.promise;
},
ingest: function _Node_ingest(message) {
/*
* Takes a message to be processed
*/
throw new NotImplementedError("ingest implementation not found");
},
emitReady: function _Node_emitReady() {
/*
* Returns when output should be flushed
* @returns boolean
*/
return true;
},
flush: function _Node_flush() {
/*
* Emit whatever's left and clear result store
* @returns a resolved promise
*/
let deferred = this.emitDeferred;
deferred.resolve(this.results);
this.clear();
return deferred.promise;
},
clear: function _Node_clear() {
/*
* Clear any temporary data
*/
this.results = null;
this.emitDeferred = Promise.defer();
}
}
function createNode(options) {
function EasyNode() {
this.results = null;
this.emitDeferred = Promise.defer();
}
let properties = {};
for (let key of Object.keys(options)) {
properties[key] = {value: options[key]};
}
EasyNode.prototype = Object.create(Node.prototype, properties);
return new EasyNode();
}
function Stream() {
this.objects = {};
this.listensTo = {};
this.heads = {};
}
Stream.prototype = {
addNode: function _stream_addNode(obj, isHead) {
if (!this.listensTo[obj.listenType]) {
this.listensTo[obj.listenType] = [];
}
this.listensTo[obj.listenType] = [obj.identifier];
this.objects[obj.identifier] = obj;
if (isHead) {
this.heads[obj.listenType] = true;
}
},
push: function _stream_push(messageType, message) {
/*
* Push a message down the stream, from the top
* @returns a promise that gets resolved when all downstream tasks are done
*/
let deferred = Promise.defer();
if (!(this.heads.hasOwnProperty(messageType) && this.listensTo.hasOwnProperty(messageType))) {
deferred.reject("cannot push a non-head message");
}
else {
let subTasks = [];
let handlers = this.listensTo[messageType];
for (let handlerName of handlers) {
subTasks.push(this._process(handlerName, message));
}
Promise.all(subTasks).then(() => {
deferred.resolve();
},
error => {
Cu.reportError(error);
deferred.reject(error);
});
}
return deferred.promise;
},
_process: function _stream__process(objectIdent, message) {
/*
* Process a task, until there is no more to do
*/
return Task.spawn(function _stream_process_task() {
let workerQueue = [objectIdent];
let messageQueue = [message];
while (workerQueue.length > 0) {
let worker = this.objects[workerQueue.shift()];
let workerMessage = messageQueue.shift();
let newMessage = yield worker.consume(workerMessage);
let listeners = this.listensTo[worker.emitType] || [];
for (let ident of listeners) {
workerQueue.push(ident);
messageQueue.push(newMessage);
}
}
}.bind(this));
},
flush: function _stream_flush() {
let flushList = [];
for (let objName in this.heads) {
flushList.push(this.objects[objName].flush());
}
return Promise.all(flushList);
},
}
exports.Node = Node;
exports.createNode = createNode;
exports.Stream = Stream;

Просмотреть файл

@ -0,0 +1,90 @@
"use strict";
const {Cc, Ci, Cu} = require("chrome");
Cu.import("resource://gre/modules/Services.jsm");
Cu.import("resource://gre/modules/Task.jsm");
const test = require("sdk/test");
const Promise = require("sdk/core/promise");
const {Stream, Node, createNode} = require("streams/core");
exports["test addNode"] = function test_addNode(assert) {
let dummySpout = createNode({
identifier: "dummySpout",
listenType: "dummyInput",
emitType: "dummyOutput",
});
let stream = new Stream();
stream.addNode(dummySpout);
assert.equal(Object.keys(stream.objects).length, 1, "addNode adds to a list");
assert.equal(Object.keys(stream.listensTo["dummyInput"]).length, 1, "addNode populates listener list");
assert.equal(stream.listensTo["dummyInput"][0], "dummySpout", "addNode populates dummyInput listeners");
let dummyBolt = createNode({
identifier: "dummyBolt",
listenType: "dummyOutput",
emitType: "dummierOutput",
});
stream.addNode(dummyBolt);
assert.equal(Object.keys(stream.objects).length, 2, "addNode adds another object to a list");
assert.equal(Object.keys(stream.listensTo["dummyOutput"]).length, 1, "addNode populates dummyOutput listeners");
}
exports["test push linear topology"] = function test_flush(assert, done) {
/*
* Tests a simple topology:
* Stream = [Spout -> Bolt -> Bolt]
*/
Task.spawn(function() {
let boltDeferred = Promise.defer();
let pairSpout = createNode({
identifier: "twoMsgSpout",
listenType: "lonelyMessage",
emitType: "pairMessages",
ingest: function(message) {
if (!this.results) {
this.results = [];
}
this.results.push(message);
},
emitReady: function() {
return this.results.length > 1;
}
});
let capitalizeBolt = createNode({
identifier: "capitalizeBolt",
listenType: "pairMessages",
emitType: "capitalizedPairs",
ingest: function(messages) {
this.results = [];
for (let message of messages) {
this.results.push(message.toUpperCase());
}
}
});
let resultAsserterBolt = createNode({
identifier: "resultAsserterBolt",
listenType: "capitalizedPairs",
emitType: null,
ingest: function(messages) {
boltDeferred.resolve();
assert.equal(messages.length, 2);
}
});
let stream = new Stream();
stream.addNode(pairSpout, true);
stream.addNode(capitalizeBolt);
stream.addNode(resultAsserterBolt);
let pushPromise = stream.push("lonelyMessage", "message 1");
stream.push("lonelyMessage", "message 2");
yield boltDeferred.promise;
yield pushPromise;
}).then(done);
};
test.run(exports);