This commit is contained in:
Olivier Yiptong 2014-05-22 12:18:37 -04:00
Родитель 92d86ce0c4
Коммит d75d19598b
2 изменённых файлов: 56 добавлений и 2 удалений

Просмотреть файл

@ -163,8 +163,11 @@ Stream.prototype = {
flush: function _stream_flush() {
let flushList = [];
for (let objName in this.heads) {
flushList.push(this.objects[objName].flush());
for (let messageType in this.heads) {
let handlers = this.listensTo[messageType];
for (let handlerName of handlers) {
flushList.push(this.objects[handlerName].flush());
}
}
return Promise.all(flushList);
},

Просмотреть файл

@ -217,4 +217,55 @@ exports["test push complex topology"] = function test_push_complex(assert, done)
}).then(done);
}
/*
* Tests flushing a stream
*/
exports["test flush"] = function test_flush(assert, done) {
// spout that waits until a count reaches at least 5 to push
let bufferFiveSpout = createNode({
identifier: "bufferFiveSpout",
listenType: "incr",
emitType: "bufferedCount",
ingest: function(count) {
if (!this.results) {
this.results = 0;
}
this.results += count;
},
emitReady: function() {
return this.results >= 5;
}
});
let doAssert;
let assertionBolt = createNode({
identifier: "assertionBolt",
listenType: "bufferedCount",
emitType: null,
ingest: function(count) {
doAssert(count);
}
});
let stream = new Stream();
stream.addNode(bufferFiveSpout, true);
stream.addNode(assertionBolt);
let pushPromise;
// make sure buffering until five works
stream.push("incr", 4);
doAssert = function(count) {
assert.equal(count, 5);
}
yield stream.push("incr", 1);
assert.equal(bufferFiveSpout.results, null);
// flushing should override the buffering behavior
stream.push("incr", 1);
doAssert = function(count) {
assert.equal(count, 1);
}
yield stream.flush();
}
test.run(exports);