diff --git a/lib/streams/core.js b/lib/streams/core.js index b2a791a..6a3ed7d 100644 --- a/lib/streams/core.js +++ b/lib/streams/core.js @@ -97,13 +97,17 @@ Node.prototype = { } } -function createNode(options) { +function createNode(options, locals) { function EasyNode() { this.results = null; this.emitDeferred = Promise.defer(); + + for (let key in locals) { + this[key] = locals[key]; + } } let properties = {}; - for (let key of Object.keys(options)) { + for (let key in options) { properties[key] = {value: options[key]}; } EasyNode.prototype = Object.create(Node.prototype, properties); diff --git a/lib/streams/dailyInterestsSpout.js b/lib/streams/dailyInterestsSpout.js index 4a04bba..a11d350 100644 --- a/lib/streams/dailyInterestsSpout.js +++ b/lib/streams/dailyInterestsSpout.js @@ -4,99 +4,105 @@ const {storage} = require("sdk/simple-storage"); const {createNode} = require("streams/core"); const {DateUtils} = require("DateUtils"); -let dailyInterestsSpout = createNode({ - identifier: "dailyInterestSpout", - listenType: "interest", - emitType: "dailyInterests", +let DailyInterestsSpout = { + create: function _DIS_create(storageBackend) { + let node = createNode({ + identifier: "dailyInterestSpout", + listenType: "interest", + emitType: "dailyInterests", - _storeInterest: function _DIS__storeInterest(host, visitDate, visitCount, namespace, type, interest) { - if (!storage.dayBufferInterests) { - storage.dayBufferInterests = {}; - } - if (!storage.dayBufferInterests[visitDate]) { - storage.dayBufferInterests[visitDate] = {}; - } - if (!storage.dayBufferInterests[visitDate][type]) { - storage.dayBufferInterests[visitDate][type] = {}; - } - if (!storage.dayBufferInterests[visitDate][type][namespace]) { - storage.dayBufferInterests[visitDate][type][namespace] = {}; - } - if (!storage.dayBufferInterests[visitDate][type][namespace][interest]) { - storage.dayBufferInterests[visitDate][type][namespace][interest] = {}; - } - if (!storage.dayBufferInterests[visitDate][type][namespace][interest][host]) { - storage.dayBufferInterests[visitDate][type][namespace][interest][host] = 0; - } - storage.dayBufferInterests[visitDate][type][namespace][interest][host] += visitCount; - }, + _storeInterest: function _DIS__storeInterest(host, visitDate, visitCount, namespace, type, interest) { + if (!this.storage.dayBufferInterests) { + this.storage.dayBufferInterests = {}; + } + if (!this.storage.dayBufferInterests[visitDate]) { + this.storage.dayBufferInterests[visitDate] = {}; + } + if (!this.storage.dayBufferInterests[visitDate][type]) { + this.storage.dayBufferInterests[visitDate][type] = {}; + } + if (!this.storage.dayBufferInterests[visitDate][type][namespace]) { + this.storage.dayBufferInterests[visitDate][type][namespace] = {}; + } + if (!this.storage.dayBufferInterests[visitDate][type][namespace][interest]) { + this.storage.dayBufferInterests[visitDate][type][namespace][interest] = {}; + } + if (!this.storage.dayBufferInterests[visitDate][type][namespace][interest][host]) { + this.storage.dayBufferInterests[visitDate][type][namespace][interest][host] = 0; + } + this.storage.dayBufferInterests[visitDate][type][namespace][interest][host] += visitCount; + }, - ingest: function _DIS_ingest(message) { - if (!message) { - return - } - let {details, dateVisits} = message; - let {host, visitDate, visitCount, namespace, results} = details; - results.forEach(item => { - let {type, interests} = item; - interests.forEach(interest => { - Object.keys(dateVisits).forEach(date => { - this._storeInterest(host, date, dateVisits[date], namespace, type, interest); + ingest: function _DIS_ingest(message) { + if (!message) { + return + } + let {details, dateVisits} = message; + let {host, visitDate, visitCount, namespace, results} = details; + results.forEach(item => { + let {type, interests} = item; + interests.forEach(interest => { + Object.keys(dateVisits).forEach(date => { + this._storeInterest(host, date, dateVisits[date], namespace, type, interest); + }); + }); }); - }); - }); - }, + }, - emitReady: function _DIS_emitReady() { - let dates = Object.keys(storage.dayBufferInterests); + emitReady: function _DIS_emitReady() { + let dates = Object.keys(this.storage.dayBufferInterests); - // check that we have more than one. having only one may mean that we're - // still adding interests for visits - if (dates.length < 2) { - return false; - } + // check that we have more than one. having only one may mean that we're + // still adding interests for visits + if (dates.length < 2) { + return false; + } - // sort by dates, latest-first - // return everything except latest day - dates = dates.sort(function (a,b) { - return parseInt(b) - parseInt(a); - }); - let pushDays = dates.slice(1, dates.length); - let pushData = {}; - for (let i=0; i < pushDays.length; i++) { - let day = pushDays[i]; - pushData[day] = storage.dayBufferInterests[day]; - delete storage.dayBufferInterests[day]; - } - this.results = pushData; + // sort by dates, latest-first + // return everything except latest day + dates = dates.sort(function (a,b) { + return parseInt(b) - parseInt(a); + }); + let pushDays = dates.slice(1, dates.length); + let pushData = {}; + for (let i=0; i < pushDays.length; i++) { + let day = pushDays[i]; + pushData[day] = this.storage.dayBufferInterests[day]; + delete this.storage.dayBufferInterests[day]; + } + this.results = pushData; - if (this._emitCb) { - this._emitCb(DateUtils.today() - dates[0]); - } - return true; - }, + if (this._emitCb) { + this._emitCb(DateUtils.today() - dates[0]); + } + return true; + }, - flush: function _DIS_flush() { - let deferred = this.emitDeferred; - if (this.results) { - // invoked when emitReady - deferred.resolve(this.results); - } - else { - // invoked directly - deferred.resolve(storage.dayBufferInterests); - } - this.clear(); - return deferred.promise; - }, + flush: function _DIS_flush() { + let deferred = this.emitDeferred; + if (this.results) { + // invoked when emitReady + deferred.resolve(this.results); + } + else { + // invoked directly + deferred.resolve(this.storage.dayBufferInterests); + } + this.clear(); + return deferred.promise; + }, - clearStorage: function _DIS_clearStorage() { - delete storage.dayBufferInterests; - }, + clearStorage: function _DIS_clearStorage() { + delete this.storage.dayBufferInterests; + }, - setEmitCallback: function(cb) { - this._emitCb = cb; + setEmitCallback: function(cb) { + this._emitCb = cb; + } + }, {storage: storageBackend || storage}); + + return node; } -}); +}; -exports.dailyInterestsSpout = dailyInterestsSpout; +exports.DailyInterestsSpout = DailyInterestsSpout; diff --git a/lib/streams/dayCountRankerBolt.js b/lib/streams/dayCountRankerBolt.js index dbd9e79..dcd8f5c 100644 --- a/lib/streams/dayCountRankerBolt.js +++ b/lib/streams/dayCountRankerBolt.js @@ -8,75 +8,77 @@ const {mergeObjects} = require("Utils"); * Takes dailyInterests messages and ranks the interests by occurences over daily * occurrences. It does not take into account the visit counts. */ -let makeRanker = function makeRanker(namespace, type) { - let capitalizedType = type.charAt(0).toUpperCase() + type.slice(1); - let dailyInterestRanker = createNode({ - identifier: namespace+capitalizedType+"Ranker", - listenType: "dailyInterests", - namespace: namespace, - type: type, - storageKey: "daycount_" + namespace + "_" + type, - emitType: null, +let DayCountRankerBolt = { + create: function _DCRB_create(namespace, type, storageBackend) { + let capitalizedType = type.charAt(0).toUpperCase() + type.slice(1); + let node = createNode({ + identifier: namespace+capitalizedType+"Ranker", + listenType: "dailyInterests", + namespace: namespace, + type: type, + storageKey: "daycount_" + namespace + "_" + type, + emitType: null, - init: function _DIR_init() { - if (!storage.ranking) { - storage.ranking = {}; - } - if (!storage.ranking[this.storageKey]) { - storage.ranking[this.storageKey] = {}; - } - this.interests = storage.ranking[this.storageKey]; - }, + init: function _DIR_init() { + if (!this.storage.ranking) { + this.storage.ranking = {}; + } + if (!this.storage.ranking[this.storageKey]) { + this.storage.ranking[this.storageKey] = {}; + } + this.interests = this.storage.ranking[this.storageKey]; + }, - ingest: function _DIR_ingest(message) { - for (let day in message) { - let typeObject = message[day][this.type]; - if (typeObject && typeObject[this.namespace]) { - let namespace = typeObject[this.namespace]; - for (let interest in namespace) { - if (!this.interests.hasOwnProperty(interest)) { - this.interests[interest] = 0; + ingest: function _DIR_ingest(message) { + for (let day in message) { + let typeObject = message[day][this.type]; + if (typeObject && typeObject[this.namespace]) { + let namespace = typeObject[this.namespace]; + for (let interest in namespace) { + if (!this.interests.hasOwnProperty(interest)) { + this.interests[interest] = 0; + } + this.interests[interest] += 1; } - this.interests[interest] += 1; } } + this.saveRanking(); + }, + + getInterests: function _DIR_getInterests() { + if (Object.keys(this.storage.ranking[this.storageKey]).length > 0) { + return JSON.parse(JSON.stringify(this.storage.ranking[this.storageKey])); + } + return null; + }, + + getRanking: function _DIR_getRanking() { + let ranking = []; + let interests = this.getInterests() || {}; + Object.keys(interests).sort(function (a,b) { + return interests[b] - interests[a]; + }).forEach(interest => { + ranking.push({interest: interest, score: interests[interest]}); + }); + return ranking; + }, + + saveRanking: function _DIR_saveRanking() { + let storageData = this.storage.ranking[this.storageKey]; + mergeObjects(storageData, this.interests); + }, + + clearData: function _DIR_clear() { + this.interests = {}; + this.storage.ranking[this.storageKey] = {}; + }, + + clearStorage: function _DIR_clearStorage() { + this.storage.ranking[this.storageKey]; } - this.saveRanking(); - }, - - getInterests: function _DIR_getInterests() { - if (Object.keys(storage.ranking[this.storageKey]).length > 0) { - return JSON.parse(JSON.stringify(storage.ranking[this.storageKey])); - } - return null; - }, - - getRanking: function _DIR_getRanking() { - let ranking = []; - let interests = this.getInterests() || {}; - Object.keys(interests).sort(function (a,b) { - return interests[b] - interests[a]; - }).forEach(interest => { - ranking.push({interest: interest, score: interests[interest]}); - }); - return ranking; - }, - - saveRanking: function _DIR_saveRanking() { - let storageData = storage.ranking[this.storageKey]; - mergeObjects(storageData, this.interests); - }, - - clearData: function _DIR_clear() { - this.interests = {}; - storage.ranking[this.storageKey] = {}; - }, - - clearStorage: function _DIR_clearStorage() { - delete storage.ranking[this.storageKey]; - } - }); - return dailyInterestRanker; + }, {storage: storageBackend || storage}); + return node; + } }; -exports.makeRanker = makeRanker; +exports.DayCountRankerBolt = DayCountRankerBolt; diff --git a/lib/streams/hostStripBolt.js b/lib/streams/hostStripBolt.js index 83c12bd..fc76480 100644 --- a/lib/streams/hostStripBolt.js +++ b/lib/streams/hostStripBolt.js @@ -6,30 +6,35 @@ const {createNode} = require("streams/core"); * Takes dailyInterests messages and strips the hosts out of them, while keeping * the counts. */ -let hostStripBolt = createNode({ - identifier: "hostStripBolt", - listenType: "dailyInterests", - emitType: "hostlessInterests", - ingest: function _HSB_ingest(message) { - for (let timeKey in message) { - let period = message[timeKey]; - for (let typeKey in period) { - let type = period[typeKey]; - for (let nsKey in type) { - let namespace = type[nsKey]; - for (let interestKey in namespace) { - let interest = namespace[interestKey]; - let counts = []; - for (let hostKey in interest) { - counts.push(interest[hostKey]); +let HostStripBolt = { + create: function _HSB_create() { + let node = createNode({ + identifier: "hostStripBolt", + listenType: "dailyInterests", + emitType: "hostlessInterests", + ingest: function _HSB_ingest(message) { + for (let timeKey in message) { + let period = message[timeKey]; + for (let typeKey in period) { + let type = period[typeKey]; + for (let nsKey in type) { + let namespace = type[nsKey]; + for (let interestKey in namespace) { + let interest = namespace[interestKey]; + let counts = []; + for (let hostKey in interest) { + counts.push(interest[hostKey]); + } + namespace[interestKey] = counts; + } } - namespace[interestKey] = counts; } } - } - } - this.results = message; - }, -}); + this.results = message; + }, + }); + return node; + } +} -exports.hostStripBolt = hostStripBolt; +exports.HostStripBolt = HostStripBolt; diff --git a/test/test-streams-dailyInterestsSpout.js b/test/test-streams-dailyInterestsSpout.js index 25dcb90..c17d0d3 100644 --- a/test/test-streams-dailyInterestsSpout.js +++ b/test/test-streams-dailyInterestsSpout.js @@ -11,7 +11,7 @@ const Promise = require("sdk/core/promise"); Cu.import("resource://gre/modules/Task.jsm"); const {DateUtils} = require("DateUtils"); -const {dailyInterestsSpout} = require("streams/dailyInterestsSpout"); +const {DailyInterestsSpout} = require("streams/dailyInterestsSpout"); const {Stream, createNode} = require("streams/core"); const test = require("sdk/test"); @@ -50,6 +50,7 @@ exports["test visit processing"] = function test_PastVisits(assert, done) { } }); + let dailyInterestsSpout = DailyInterestsSpout.create({}); let stream = new Stream(); stream.addNode(dailyInterestsSpout, true); stream.addNode(assertionBolt); @@ -94,6 +95,7 @@ exports["test ignore latest day visit unless flush"] = function test_TodayVisits } }); + let dailyInterestsSpout = DailyInterestsSpout.create({}); let stream = new Stream(); stream.addNode(dailyInterestsSpout, true); stream.addNode(assertionBolt); @@ -138,6 +140,7 @@ exports["test emit callback"] = function test_TodayVisits(assert, done) { dateVisits = {} dateVisits[today-2] = 1 + let dailyInterestsSpout = DailyInterestsSpout.create({}); let stream = new Stream(); stream.addNode(dailyInterestsSpout, true); diff --git a/test/test-streams-dayCountRankerBolt.js b/test/test-streams-dayCountRankerBolt.js index 13cfbd6..3d4b323 100644 --- a/test/test-streams-dayCountRankerBolt.js +++ b/test/test-streams-dayCountRankerBolt.js @@ -4,7 +4,7 @@ const {Cc, Ci, Cu} = require("chrome"); Cu.import("resource://gre/modules/Task.jsm"); const Promise = require("sdk/core/promise"); const test = require("sdk/test"); -const {makeRanker} = require("streams/dayCountRankerBolt"); +const {DayCountRankerBolt} = require("streams/dayCountRankerBolt"); const {mergeObjects} = require("Utils"); let createMessage = function(namespace, type, options) { @@ -32,15 +32,16 @@ exports["test persistence"] = function test_persistence(assert, done) { let namespace = "namespace"; let type = "persistence_test"; + let dataStore = {}; - ranker = makeRanker(namespace, type); + ranker = DayCountRankerBolt.create(namespace, type, dataStore); yield ranker.consume({ "1": createMessage(namespace, type, {numAutos: 1}), "2": createMessage(namespace, type, {numAutos: 1}), }); assert.equal(ranker.getInterests().Autos, 2, "ranking should accumulate"); // now recreate ranker and add two more days - ranker = makeRanker(namespace, type); + ranker = DayCountRankerBolt.create(namespace, type, dataStore); yield ranker.consume({ "3": createMessage(namespace, type, {numAutos: 1}), "4": createMessage(namespace, type, {numAutos: 1}), @@ -56,9 +57,10 @@ exports["test persistence"] = function test_persistence(assert, done) { exports["test storage keys"] = function test_storageKeys(assert, done) { Task.spawn(function() { try { + let dataStore = {}; let rankerMeta = [{ns: "namespace", t: "storagekey_test"}, {ns: "namespace1", t: "storagekey_test1"}]; - let ranker = makeRanker(rankerMeta[0].ns, rankerMeta[0].t); - let ranker1 = makeRanker(rankerMeta[1].ns, rankerMeta[1].t); + let ranker = DayCountRankerBolt.create(rankerMeta[0].ns, rankerMeta[0].t, dataStore); + let ranker1 = DayCountRankerBolt.create(rankerMeta[1].ns, rankerMeta[1].t, dataStore); let makeMergeMessage = function() { let msg1 = createMessage(rankerMeta[0].ns, rankerMeta[0].t, {numAutos: 1}); @@ -94,7 +96,7 @@ exports["test ranking"] = function test_ranking(assert, done) { let namespace = "namespace"; let type = "ranking_test"; - let ranker = makeRanker(namespace, type); + let ranker = DayCountRankerBolt.create(namespace, type, {}); yield ranker.consume({ "1": createMessage(namespace, type, {numAutos: 1, numSports: 2}), diff --git a/test/test-streams-hostStripBolt.js b/test/test-streams-hostStripBolt.js index a6d94e6..19aa606 100644 --- a/test/test-streams-hostStripBolt.js +++ b/test/test-streams-hostStripBolt.js @@ -12,7 +12,7 @@ Cu.import("resource://gre/modules/Task.jsm"); const {DateUtils} = require("DateUtils"); const {testUtils} = require("./helpers"); -const {hostStripBolt} = require("streams/hostStripBolt"); +const {HostStripBolt} = require("streams/hostStripBolt"); const {Stream, createNode} = require("streams/core"); const test = require("sdk/test"); @@ -34,6 +34,7 @@ dailyInterestMessage[today] = { exports["test host strip"] = function test_hostStrip(assert, done) { Task.spawn(function() { + let hostStripBolt = HostStripBolt.create(); let stripped = yield hostStripBolt.consume(dailyInterestMessage); assert.equal(Object.keys(stripped[today]["rules"]["edrules"]).length, 2, "stripped data contains the same number of categories"); assert.deepEqual(stripped[today]["rules"]["edrules"]["Autos"], [3], "stripped data contains enumerated data");