consistent interface for bolts and streams: implementations use an object and a factory

This commit is contained in:
Olivier Yiptong 2014-05-28 10:55:16 -04:00
Родитель fe653a63de
Коммит 3601b448b4
7 изменённых файлов: 204 добавлений и 181 удалений

Просмотреть файл

@ -97,13 +97,17 @@ Node.prototype = {
}
}
function createNode(options) {
function createNode(options, locals) {
function EasyNode() {
this.results = null;
this.emitDeferred = Promise.defer();
for (let key in locals) {
this[key] = locals[key];
}
}
let properties = {};
for (let key of Object.keys(options)) {
for (let key in options) {
properties[key] = {value: options[key]};
}
EasyNode.prototype = Object.create(Node.prototype, properties);

Просмотреть файл

@ -4,99 +4,105 @@ const {storage} = require("sdk/simple-storage");
const {createNode} = require("streams/core");
const {DateUtils} = require("DateUtils");
let dailyInterestsSpout = createNode({
identifier: "dailyInterestSpout",
listenType: "interest",
emitType: "dailyInterests",
let DailyInterestsSpout = {
create: function _DIS_create(storageBackend) {
let node = createNode({
identifier: "dailyInterestSpout",
listenType: "interest",
emitType: "dailyInterests",
_storeInterest: function _DIS__storeInterest(host, visitDate, visitCount, namespace, type, interest) {
if (!storage.dayBufferInterests) {
storage.dayBufferInterests = {};
}
if (!storage.dayBufferInterests[visitDate]) {
storage.dayBufferInterests[visitDate] = {};
}
if (!storage.dayBufferInterests[visitDate][type]) {
storage.dayBufferInterests[visitDate][type] = {};
}
if (!storage.dayBufferInterests[visitDate][type][namespace]) {
storage.dayBufferInterests[visitDate][type][namespace] = {};
}
if (!storage.dayBufferInterests[visitDate][type][namespace][interest]) {
storage.dayBufferInterests[visitDate][type][namespace][interest] = {};
}
if (!storage.dayBufferInterests[visitDate][type][namespace][interest][host]) {
storage.dayBufferInterests[visitDate][type][namespace][interest][host] = 0;
}
storage.dayBufferInterests[visitDate][type][namespace][interest][host] += visitCount;
},
_storeInterest: function _DIS__storeInterest(host, visitDate, visitCount, namespace, type, interest) {
if (!this.storage.dayBufferInterests) {
this.storage.dayBufferInterests = {};
}
if (!this.storage.dayBufferInterests[visitDate]) {
this.storage.dayBufferInterests[visitDate] = {};
}
if (!this.storage.dayBufferInterests[visitDate][type]) {
this.storage.dayBufferInterests[visitDate][type] = {};
}
if (!this.storage.dayBufferInterests[visitDate][type][namespace]) {
this.storage.dayBufferInterests[visitDate][type][namespace] = {};
}
if (!this.storage.dayBufferInterests[visitDate][type][namespace][interest]) {
this.storage.dayBufferInterests[visitDate][type][namespace][interest] = {};
}
if (!this.storage.dayBufferInterests[visitDate][type][namespace][interest][host]) {
this.storage.dayBufferInterests[visitDate][type][namespace][interest][host] = 0;
}
this.storage.dayBufferInterests[visitDate][type][namespace][interest][host] += visitCount;
},
ingest: function _DIS_ingest(message) {
if (!message) {
return
}
let {details, dateVisits} = message;
let {host, visitDate, visitCount, namespace, results} = details;
results.forEach(item => {
let {type, interests} = item;
interests.forEach(interest => {
Object.keys(dateVisits).forEach(date => {
this._storeInterest(host, date, dateVisits[date], namespace, type, interest);
ingest: function _DIS_ingest(message) {
if (!message) {
return
}
let {details, dateVisits} = message;
let {host, visitDate, visitCount, namespace, results} = details;
results.forEach(item => {
let {type, interests} = item;
interests.forEach(interest => {
Object.keys(dateVisits).forEach(date => {
this._storeInterest(host, date, dateVisits[date], namespace, type, interest);
});
});
});
});
});
},
},
emitReady: function _DIS_emitReady() {
let dates = Object.keys(storage.dayBufferInterests);
emitReady: function _DIS_emitReady() {
let dates = Object.keys(this.storage.dayBufferInterests);
// check that we have more than one. having only one may mean that we're
// still adding interests for visits
if (dates.length < 2) {
return false;
}
// check that we have more than one. having only one may mean that we're
// still adding interests for visits
if (dates.length < 2) {
return false;
}
// sort by dates, latest-first
// return everything except latest day
dates = dates.sort(function (a,b) {
return parseInt(b) - parseInt(a);
});
let pushDays = dates.slice(1, dates.length);
let pushData = {};
for (let i=0; i < pushDays.length; i++) {
let day = pushDays[i];
pushData[day] = storage.dayBufferInterests[day];
delete storage.dayBufferInterests[day];
}
this.results = pushData;
// sort by dates, latest-first
// return everything except latest day
dates = dates.sort(function (a,b) {
return parseInt(b) - parseInt(a);
});
let pushDays = dates.slice(1, dates.length);
let pushData = {};
for (let i=0; i < pushDays.length; i++) {
let day = pushDays[i];
pushData[day] = this.storage.dayBufferInterests[day];
delete this.storage.dayBufferInterests[day];
}
this.results = pushData;
if (this._emitCb) {
this._emitCb(DateUtils.today() - dates[0]);
}
return true;
},
if (this._emitCb) {
this._emitCb(DateUtils.today() - dates[0]);
}
return true;
},
flush: function _DIS_flush() {
let deferred = this.emitDeferred;
if (this.results) {
// invoked when emitReady
deferred.resolve(this.results);
}
else {
// invoked directly
deferred.resolve(storage.dayBufferInterests);
}
this.clear();
return deferred.promise;
},
flush: function _DIS_flush() {
let deferred = this.emitDeferred;
if (this.results) {
// invoked when emitReady
deferred.resolve(this.results);
}
else {
// invoked directly
deferred.resolve(this.storage.dayBufferInterests);
}
this.clear();
return deferred.promise;
},
clearStorage: function _DIS_clearStorage() {
delete storage.dayBufferInterests;
},
clearStorage: function _DIS_clearStorage() {
delete this.storage.dayBufferInterests;
},
setEmitCallback: function(cb) {
this._emitCb = cb;
setEmitCallback: function(cb) {
this._emitCb = cb;
}
}, {storage: storageBackend || storage});
return node;
}
});
};
exports.dailyInterestsSpout = dailyInterestsSpout;
exports.DailyInterestsSpout = DailyInterestsSpout;

Просмотреть файл

@ -8,75 +8,77 @@ const {mergeObjects} = require("Utils");
* Takes dailyInterests messages and ranks the interests by occurences over daily
* occurrences. It does not take into account the visit counts.
*/
let makeRanker = function makeRanker(namespace, type) {
let capitalizedType = type.charAt(0).toUpperCase() + type.slice(1);
let dailyInterestRanker = createNode({
identifier: namespace+capitalizedType+"Ranker",
listenType: "dailyInterests",
namespace: namespace,
type: type,
storageKey: "daycount_" + namespace + "_" + type,
emitType: null,
let DayCountRankerBolt = {
create: function _DCRB_create(namespace, type, storageBackend) {
let capitalizedType = type.charAt(0).toUpperCase() + type.slice(1);
let node = createNode({
identifier: namespace+capitalizedType+"Ranker",
listenType: "dailyInterests",
namespace: namespace,
type: type,
storageKey: "daycount_" + namespace + "_" + type,
emitType: null,
init: function _DIR_init() {
if (!storage.ranking) {
storage.ranking = {};
}
if (!storage.ranking[this.storageKey]) {
storage.ranking[this.storageKey] = {};
}
this.interests = storage.ranking[this.storageKey];
},
init: function _DIR_init() {
if (!this.storage.ranking) {
this.storage.ranking = {};
}
if (!this.storage.ranking[this.storageKey]) {
this.storage.ranking[this.storageKey] = {};
}
this.interests = this.storage.ranking[this.storageKey];
},
ingest: function _DIR_ingest(message) {
for (let day in message) {
let typeObject = message[day][this.type];
if (typeObject && typeObject[this.namespace]) {
let namespace = typeObject[this.namespace];
for (let interest in namespace) {
if (!this.interests.hasOwnProperty(interest)) {
this.interests[interest] = 0;
ingest: function _DIR_ingest(message) {
for (let day in message) {
let typeObject = message[day][this.type];
if (typeObject && typeObject[this.namespace]) {
let namespace = typeObject[this.namespace];
for (let interest in namespace) {
if (!this.interests.hasOwnProperty(interest)) {
this.interests[interest] = 0;
}
this.interests[interest] += 1;
}
this.interests[interest] += 1;
}
}
this.saveRanking();
},
getInterests: function _DIR_getInterests() {
if (Object.keys(this.storage.ranking[this.storageKey]).length > 0) {
return JSON.parse(JSON.stringify(this.storage.ranking[this.storageKey]));
}
return null;
},
getRanking: function _DIR_getRanking() {
let ranking = [];
let interests = this.getInterests() || {};
Object.keys(interests).sort(function (a,b) {
return interests[b] - interests[a];
}).forEach(interest => {
ranking.push({interest: interest, score: interests[interest]});
});
return ranking;
},
saveRanking: function _DIR_saveRanking() {
let storageData = this.storage.ranking[this.storageKey];
mergeObjects(storageData, this.interests);
},
clearData: function _DIR_clear() {
this.interests = {};
this.storage.ranking[this.storageKey] = {};
},
clearStorage: function _DIR_clearStorage() {
this.storage.ranking[this.storageKey];
}
this.saveRanking();
},
getInterests: function _DIR_getInterests() {
if (Object.keys(storage.ranking[this.storageKey]).length > 0) {
return JSON.parse(JSON.stringify(storage.ranking[this.storageKey]));
}
return null;
},
getRanking: function _DIR_getRanking() {
let ranking = [];
let interests = this.getInterests() || {};
Object.keys(interests).sort(function (a,b) {
return interests[b] - interests[a];
}).forEach(interest => {
ranking.push({interest: interest, score: interests[interest]});
});
return ranking;
},
saveRanking: function _DIR_saveRanking() {
let storageData = storage.ranking[this.storageKey];
mergeObjects(storageData, this.interests);
},
clearData: function _DIR_clear() {
this.interests = {};
storage.ranking[this.storageKey] = {};
},
clearStorage: function _DIR_clearStorage() {
delete storage.ranking[this.storageKey];
}
});
return dailyInterestRanker;
}, {storage: storageBackend || storage});
return node;
}
};
exports.makeRanker = makeRanker;
exports.DayCountRankerBolt = DayCountRankerBolt;

Просмотреть файл

@ -6,30 +6,35 @@ const {createNode} = require("streams/core");
* Takes dailyInterests messages and strips the hosts out of them, while keeping
* the counts.
*/
let hostStripBolt = createNode({
identifier: "hostStripBolt",
listenType: "dailyInterests",
emitType: "hostlessInterests",
ingest: function _HSB_ingest(message) {
for (let timeKey in message) {
let period = message[timeKey];
for (let typeKey in period) {
let type = period[typeKey];
for (let nsKey in type) {
let namespace = type[nsKey];
for (let interestKey in namespace) {
let interest = namespace[interestKey];
let counts = [];
for (let hostKey in interest) {
counts.push(interest[hostKey]);
let HostStripBolt = {
create: function _HSB_create() {
let node = createNode({
identifier: "hostStripBolt",
listenType: "dailyInterests",
emitType: "hostlessInterests",
ingest: function _HSB_ingest(message) {
for (let timeKey in message) {
let period = message[timeKey];
for (let typeKey in period) {
let type = period[typeKey];
for (let nsKey in type) {
let namespace = type[nsKey];
for (let interestKey in namespace) {
let interest = namespace[interestKey];
let counts = [];
for (let hostKey in interest) {
counts.push(interest[hostKey]);
}
namespace[interestKey] = counts;
}
}
namespace[interestKey] = counts;
}
}
}
}
this.results = message;
},
});
this.results = message;
},
});
return node;
}
}
exports.hostStripBolt = hostStripBolt;
exports.HostStripBolt = HostStripBolt;

Просмотреть файл

@ -11,7 +11,7 @@ const Promise = require("sdk/core/promise");
Cu.import("resource://gre/modules/Task.jsm");
const {DateUtils} = require("DateUtils");
const {dailyInterestsSpout} = require("streams/dailyInterestsSpout");
const {DailyInterestsSpout} = require("streams/dailyInterestsSpout");
const {Stream, createNode} = require("streams/core");
const test = require("sdk/test");
@ -50,6 +50,7 @@ exports["test visit processing"] = function test_PastVisits(assert, done) {
}
});
let dailyInterestsSpout = DailyInterestsSpout.create({});
let stream = new Stream();
stream.addNode(dailyInterestsSpout, true);
stream.addNode(assertionBolt);
@ -94,6 +95,7 @@ exports["test ignore latest day visit unless flush"] = function test_TodayVisits
}
});
let dailyInterestsSpout = DailyInterestsSpout.create({});
let stream = new Stream();
stream.addNode(dailyInterestsSpout, true);
stream.addNode(assertionBolt);
@ -138,6 +140,7 @@ exports["test emit callback"] = function test_TodayVisits(assert, done) {
dateVisits = {}
dateVisits[today-2] = 1
let dailyInterestsSpout = DailyInterestsSpout.create({});
let stream = new Stream();
stream.addNode(dailyInterestsSpout, true);

Просмотреть файл

@ -4,7 +4,7 @@ const {Cc, Ci, Cu} = require("chrome");
Cu.import("resource://gre/modules/Task.jsm");
const Promise = require("sdk/core/promise");
const test = require("sdk/test");
const {makeRanker} = require("streams/dayCountRankerBolt");
const {DayCountRankerBolt} = require("streams/dayCountRankerBolt");
const {mergeObjects} = require("Utils");
let createMessage = function(namespace, type, options) {
@ -32,15 +32,16 @@ exports["test persistence"] = function test_persistence(assert, done) {
let namespace = "namespace";
let type = "persistence_test";
let dataStore = {};
ranker = makeRanker(namespace, type);
ranker = DayCountRankerBolt.create(namespace, type, dataStore);
yield ranker.consume({
"1": createMessage(namespace, type, {numAutos: 1}),
"2": createMessage(namespace, type, {numAutos: 1}),
});
assert.equal(ranker.getInterests().Autos, 2, "ranking should accumulate");
// now recreate ranker and add two more days
ranker = makeRanker(namespace, type);
ranker = DayCountRankerBolt.create(namespace, type, dataStore);
yield ranker.consume({
"3": createMessage(namespace, type, {numAutos: 1}),
"4": createMessage(namespace, type, {numAutos: 1}),
@ -56,9 +57,10 @@ exports["test persistence"] = function test_persistence(assert, done) {
exports["test storage keys"] = function test_storageKeys(assert, done) {
Task.spawn(function() {
try {
let dataStore = {};
let rankerMeta = [{ns: "namespace", t: "storagekey_test"}, {ns: "namespace1", t: "storagekey_test1"}];
let ranker = makeRanker(rankerMeta[0].ns, rankerMeta[0].t);
let ranker1 = makeRanker(rankerMeta[1].ns, rankerMeta[1].t);
let ranker = DayCountRankerBolt.create(rankerMeta[0].ns, rankerMeta[0].t, dataStore);
let ranker1 = DayCountRankerBolt.create(rankerMeta[1].ns, rankerMeta[1].t, dataStore);
let makeMergeMessage = function() {
let msg1 = createMessage(rankerMeta[0].ns, rankerMeta[0].t, {numAutos: 1});
@ -94,7 +96,7 @@ exports["test ranking"] = function test_ranking(assert, done) {
let namespace = "namespace";
let type = "ranking_test";
let ranker = makeRanker(namespace, type);
let ranker = DayCountRankerBolt.create(namespace, type, {});
yield ranker.consume({
"1": createMessage(namespace, type, {numAutos: 1, numSports: 2}),

Просмотреть файл

@ -12,7 +12,7 @@ Cu.import("resource://gre/modules/Task.jsm");
const {DateUtils} = require("DateUtils");
const {testUtils} = require("./helpers");
const {hostStripBolt} = require("streams/hostStripBolt");
const {HostStripBolt} = require("streams/hostStripBolt");
const {Stream, createNode} = require("streams/core");
const test = require("sdk/test");
@ -34,6 +34,7 @@ dailyInterestMessage[today] = {
exports["test host strip"] = function test_hostStrip(assert, done) {
Task.spawn(function() {
let hostStripBolt = HostStripBolt.create();
let stripped = yield hostStripBolt.consume(dailyInterestMessage);
assert.equal(Object.keys(stripped[today]["rules"]["edrules"]).length, 2, "stripped data contains the same number of categories");
assert.deepEqual(stripped[today]["rules"]["edrules"]["Autos"], [3], "stripped data contains enumerated data");