Fix component to handle entity deletions properly.

Also update writer to close on end of stream.
This commit is contained in:
Irakli Gozalishvili 2012-10-28 20:49:59 -07:00
Родитель 909728bc02
Коммит 07dbdc01a7
2 изменённых файлов: 21 добавлений и 8 удалений

Просмотреть файл

@ -5,15 +5,18 @@ var state = require("./state")
var channel = require("reducers/channel")
var emit = require("reducers/emit")
var filter = require("reducers/filter")
var takeWhile = require("reducers/take-while")
var map = require("reducers/map")
var reduce = require("reducers/reduce")
var flatten = require("reducers/flatten")
var concat = require("reducers/concat")
var patch = require("diffpatcher/patch")
var has = require("./util/has")
var field = require("./util/field")
var association = require("util/association")
var isnt = require("./util/isnt")
var keys = Object.keys
@ -84,19 +87,26 @@ function component(read, write) {
reduce(changes, function react(state, delta) {
keys(delta).forEach(function forEachEntity(id) {
if (delta[id] === null) {
state[id] = null
}
else if (state[id] != void(0)) {
// If it's first time we're seeing this item we create a
// fork of state updates for it.
if (state[id] === void(0)) {
// This specific change is already being dispatched which means
// that reading from `changes` now won't include it. Although
// entity associated with this specific update should still
// get it. There for we prepend changes with a current `delta`.
var fork = concat(delta, changes)
// Filter stream of changes to a stream of changes to the entity
// with a given id.
var entityChanges = filter(changes, has(id))
var entityChanges = filter(fork, has(id))
// Map changes for the given entity to an actual change deltas
var entityUpdates = map(entityChanges, field(id))
// Take updates only up until update state is `null` since that
// means close of the entity.
var updates = takeWhile(entityUpdates, isnt(null))
// And write entity updates into entity. Note that initial write
// as one here will actually create that entity returning it back.
// Also all subsequent updates will be automatically written into it.
var entity = write(entityUpdates, options)
var entity = write(updates, options)
// Start reading stream of changes caused by the entity. It is also
// mapped back to same structure of updates as changes were.
var input = read(entity, options)
@ -114,7 +124,7 @@ function component(read, write) {
// Return joined stream of all inputs from all the entities of this
// component.
flatten(inputs)
return flatten(inputs)
}
}

Просмотреть файл

@ -32,10 +32,13 @@ function writer(swap, close, open) {
return function write(input, output, options) {
output = output || open(options)
reduce(input, function(state, update) {
if (update === null) close(output)
else swap(output, update)
return update
})
// Once reduction of input is complete close. `reduce` always returns
// value equivalent of sequence with a sequence of single value representing
// result of accumulation.
reduce(result, function() { close(options) })
return output
}
}