diff --git a/component.js b/component.js index 0ec7427..be8e707 100644 --- a/component.js +++ b/component.js @@ -5,15 +5,18 @@ var state = require("./state") var channel = require("reducers/channel") var emit = require("reducers/emit") var filter = require("reducers/filter") +var takeWhile = require("reducers/take-while") var map = require("reducers/map") var reduce = require("reducers/reduce") var flatten = require("reducers/flatten") +var concat = require("reducers/concat") var patch = require("diffpatcher/patch") var has = require("./util/has") var field = require("./util/field") var association = require("util/association") +var isnt = require("./util/isnt") var keys = Object.keys @@ -84,19 +87,26 @@ function component(read, write) { reduce(changes, function react(state, delta) { keys(delta).forEach(function forEachEntity(id) { - if (delta[id] === null) { - state[id] = null - } - else if (state[id] != void(0)) { + // If it's first time we're seeing this item we create a + // fork of state updates for it. + if (state[id] === void(0)) { + // This specific change is already being dispatched which means + // that reading from `changes` now won't include it. Although + // entity associated with this specific update should still + // get it. There for we prepend changes with a current `delta`. + var fork = concat(delta, changes) // Filter stream of changes to a stream of changes to the entity // with a given id. - var entityChanges = filter(changes, has(id)) + var entityChanges = filter(fork, has(id)) // Map changes for the given entity to an actual change deltas var entityUpdates = map(entityChanges, field(id)) + // Take updates only up until update state is `null` since that + // means close of the entity. + var updates = takeWhile(entityUpdates, isnt(null)) // And write entity updates into entity. Note that initial write // as one here will actually create that entity returning it back. // Also all subsequent updates will be automatically written into it. - var entity = write(entityUpdates, options) + var entity = write(updates, options) // Start reading stream of changes caused by the entity. It is also // mapped back to same structure of updates as changes were. var input = read(entity, options) @@ -114,7 +124,7 @@ function component(read, write) { // Return joined stream of all inputs from all the entities of this // component. - flatten(inputs) + return flatten(inputs) } } diff --git a/writer.js b/writer.js index 404010b..f3d6aa3 100644 --- a/writer.js +++ b/writer.js @@ -32,10 +32,13 @@ function writer(swap, close, open) { return function write(input, output, options) { output = output || open(options) reduce(input, function(state, update) { - if (update === null) close(output) else swap(output, update) return update }) + // Once reduction of input is complete close. `reduce` always returns + // value equivalent of sequence with a sequence of single value representing + // result of accumulation. + reduce(result, function() { close(options) }) return output } }