diff --git a/lib/crawler.js b/lib/crawler.js index 956c7de..f3119a9 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -156,9 +156,10 @@ class Crawler { _acquireLock(request) { - debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_acquireLock(${loopName}:${request.toUniqueString()}): enter`); if (!request.url || !this.locker || request.requiresLock === false) { - debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_acquireLock(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return Q(request); } const self = this; @@ -166,14 +167,14 @@ class Crawler { return this.trace(self.locker.lock(request.url, self.options.processingTtl || 60 * 1000), 'lock'); }).then( lock => { - debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_acquireLock(${loopName}:${request.toUniqueString()}): exit (success)`); request.lock = lock; return request; }, error => { // If we could not acquire a lock, requeue. If the "error" is a normal Exceeded scenario, requeue normally // noting that we could not get a lock. For any other error, requeue and capture the error for debugging. - debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): exit (error)`); + debug(`_acquireLock(${loopName}:${request.toUniqueString()}): exit (error)`); if (error.message.startsWith('Exceeded')) { return request.markRequeue('Collision', 'Could not lock'); } @@ -183,9 +184,10 @@ class Crawler { } _releaseLock(request) { - debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_releaseLock(${loopName}:${request.toUniqueString()}): enter`); if (!request.lock || !this.locker) { - debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_releaseLock(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return Q(request); } const self = this; @@ -193,12 +195,12 @@ class Crawler { return this.locker.unlock(request.lock); }).then( () => { - debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_releaseLock(${loopName}:${request.toUniqueString()}): exit (success)`); request.lock = null; return request; }, error => { - debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): exit (error)`); + debug(`_releaseLock(${loopName}:${request.toUniqueString()}): exit (error)`); request.lock = null; self.logger.error(error); return request; @@ -297,7 +299,8 @@ class Crawler { } _requeue(request) { - debug(`_requeue(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_requeue(${loopName}:${request.toUniqueString()}): enter`); return Q.try(() => { request.attemptCount = request.attemptCount || 0; if (++request.attemptCount > 5) { @@ -307,34 +310,36 @@ class Crawler { const queuable = request.createRequeuable(); return this.queues.repush(request, queuable); }).then(result => { - debug(`_requeue(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_requeue(${loopName}:${request.toUniqueString()}): exit (success)`); return result; }); } _filter(request) { - debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_filter(${loopName}:${request.toUniqueString()}): enter`); if (request.shouldSkip()) { - debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_filter(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return request; } if (!request.type || !request.url) { // park the malformed request in the dead queue for debugging and ignore the returned promise - debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (malformed)`); + debug(`_filter(${loopName}:${request.toUniqueString()}): exit (malformed)`); return this._storeDeadletter(request, `Detected malformed request ${request.toString()}`); } if (this._shouldFilter(request)) { - debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (success - filtered)`); + debug(`_filter(${loopName}:${request.toUniqueString()}): exit (success - filtered)`); request.markSkip('Declined'); } - debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (success - not filtered)`); + debug(`_filter(${loopName}:${request.toUniqueString()}): exit (success - not filtered)`); return request; } _fetch(request) { - debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_fetch(${loopName}:${request.toUniqueString()}): enter`); if (request.shouldSkip()) { - debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_fetch(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return request; } if (request.payload) { @@ -345,21 +350,22 @@ class Crawler { if (request.payload.fetchedAt) { request.response.headers.fetchedAt = request.payload.fetchedAt; } - debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): exit (success - payload)`); + debug(`_fetch(${loopName}:${request.toUniqueString()}): exit (success - payload)`); return request; } return this._logStartEnd('fetching', request, () => { return this.fetcher.fetch(request); }).then(request => { - debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): exit (success - fetched)`); + debug(`_fetch(${loopName}:${request.toUniqueString()}): exit (success - fetched)`); return request; }); } _convertToDocument(request) { - debug(`_convertToDocument(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_convertToDocument(${loopName}:${request.toUniqueString()}): enter`); if (request.shouldSkip()) { - debug(`_convertToDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_convertToDocument(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return Q(request); } @@ -405,14 +411,15 @@ class Crawler { if (typeof request.document === 'string') console.log('got a string document'); request.document._metadata = metadata; - debug(`_convertToDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_convertToDocument(${loopName}:${request.toUniqueString()}): exit (success)`); return Q(request); } _processDocument(request) { - debug(`_processDocument(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_processDocument(${loopName}:${request.toUniqueString()}): enter`); if (request.shouldSkip()) { - debug(`_processDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_processDocument(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return Q(request); } // if the request is a delete, mark the document as deleted and be done @@ -423,7 +430,7 @@ class Crawler { } return this._logStartEnd('processing', request, () => { request.document = this.processor.process(request); - debug(`_processDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_processDocument(${loopName}:${request.toUniqueString()}): exit (success)`); return request; }); } @@ -454,9 +461,10 @@ class Crawler { } _storeDocument(request) { - debug(`_storeDocument(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_storeDocument(${loopName}:${request.toUniqueString()}): enter`); if (request.shouldSkip() || !request.shouldSave()) { - debug(`_storeDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`); + debug(`_storeDocument(${loopName}:${request.toUniqueString()}): exit (nothing to do)`); return Q(request); } @@ -464,26 +472,28 @@ class Crawler { return this.store.upsert(request.document).then(upsert => { request.upsert = upsert; request.addMeta({ write: Date.now() - start }); - debug(`_storeDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_storeDocument(${loopName}:${request.toUniqueString()}): exit (success)`); return request; }); } _deleteFromQueue(request) { - debug(`_deleteFromQueue(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_deleteFromQueue(${loopName}:${request.toUniqueString()}): enter`); return Q.try(() => { return this.queues.done(request).then(() => { - debug(`_deleteFromQueue(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_deleteFromQueue(${loopName}:${request.toUniqueString()}): exit (success)`); return request; }); }); } _abandonInQueue(request) { - debug(`_abandonInQueue(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_abandonInQueue(${loopName}:${request.toUniqueString()}): enter`); return Q.try(() => { return this.queues.abandon(request).then(() => { - debug(`_abandonInQueue(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_abandonInQueue(${loopName}:${request.toUniqueString()}): exit (success)`); return request; }); }); @@ -509,10 +519,11 @@ class Crawler { } storeDeadletter(request, reason = null) { - debug(`_storeDeadletter(${request.meta.loopName}:${request.toUniqueString()}): enter`); + const loopName = request.meta ? request.meta.loopName : ''; + debug(`_storeDeadletter(${loopName}:${request.toUniqueString()}): enter`); const document = this._createDeadletter(request, reason); return this.deadletters.upsert(document).then(() => { - debug(`_storeDeadletter(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_storeDeadletter(${loopName}:${request.toUniqueString()}): exit (success)`); return request; }); } diff --git a/lib/crawlerFactory.js b/lib/crawlerFactory.js index c74a9b0..4ec8ed8 100644 --- a/lib/crawlerFactory.js +++ b/lib/crawlerFactory.js @@ -118,7 +118,7 @@ class CrawlerFactory { static createService(name) { factoryLogger.info('appInitStart'); const crawlerName = config.get('CRAWLER_NAME') || 'crawler'; - const optionsProvider = config.get('CRAWLER_OPTIONS_PROVIDER') || 'memory'; + const optionsProvider = name === 'InMemory' ? 'memory' : (config.get('CRAWLER_OPTIONS_PROVIDER') || 'memory'); const subsystemNames = ['crawler', 'fetcher', 'queuing', 'storage', 'locker']; const crawlerPromise = CrawlerFactory.createRefreshingOptions(crawlerName, subsystemNames, optionsProvider).then(options => { factoryLogger.info(`creating refreshingOption completed`); @@ -150,6 +150,7 @@ class CrawlerFactory { options.queuing.metricsStore = null; options.locker.provider = 'memory'; options.storage.provider = 'memory'; + options.storage.delta.provider = 'none'; return options; } @@ -441,7 +442,7 @@ class CrawlerFactory { } static createDeltaStore(inner, options) { - if (!options.delta || !options.delta.provider) { + if (!options.delta || !options.delta.provider || options.delta.provider === 'none') { return inner; } factoryLogger.info(`creating delta store`); diff --git a/providers/fetcher/githubFetcher.js b/providers/fetcher/githubFetcher.js index 7ce3fbe..482edf8 100644 --- a/providers/fetcher/githubFetcher.js +++ b/providers/fetcher/githubFetcher.js @@ -265,13 +265,12 @@ class GitHubFetcher { _getTypeDetails(type) { const result = { orgs: { tokenTraits: ['admin'] }, - org: { tokenTraits: [['admin'], ['public']], headers: { Accept: 'application/vnd.github.korra-preview+json'} }, - repos: { tokenTraits: [['admin'], ['public']] }, - repo: { tokenTraits: [['admin'], ['public']] }, + org: { tokenTraits: [['admin'], ['public']] }, + repos: { tokenTraits: [['admin'], ['public']], headers: { Accept: 'application/vnd.github.mercy-preview+json' } }, + repo: { tokenTraits: [['admin'], ['public']], headers: { Accept: 'application/vnd.github.mercy-preview+json' } }, teams: { tokenTraits: ['admin'] }, team: { tokenTraits: ['admin'] }, members: { tokenTraits: ['admin'] }, - // update_events: { tokenTraits: ['admin'] }, events: { tokenTraits: ['admin'] }, collaborators: { tokenTraits: ['admin'], headers: { Accept: 'application/vnd.github.korra-preview' } }, reviews: { headers: { Accept: 'application/vnd.github.black-cat-preview+json' } }, diff --git a/providers/queuing/queueSet.js b/providers/queuing/queueSet.js index b6a59a5..2c4bdf8 100644 --- a/providers/queuing/queueSet.js +++ b/providers/queuing/queueSet.js @@ -91,6 +91,7 @@ class QueueSet { _createStartMap(weights) { // Create a simple table of which queue to pop based on the weights supplied. For each queue, // look up its weight and add that many entries in the map. If no weight is included, assume 1. + weights = weights || {}; const result = []; for (let i = 0; i < this.queues.length; i++) { const count = weights[this.queues[i].getName()] || 1; diff --git a/test/unit/crawlerTests.js b/test/unit/crawlerTests.js index e5f2242..72e75ef 100644 --- a/test/unit/crawlerTests.js +++ b/test/unit/crawlerTests.js @@ -15,38 +15,6 @@ const sinon = require('sinon'); const TraversalPolicy = require('../../lib/traversalPolicy'); describe('Crawler get request', () => { - it('should get from the priority queue first', () => { - const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } }); - const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } }); - const queues = createBaseQueues({ priority: priority, normal: normal }); - const locker = createBaseLocker({ lock: () => { return Q('locked'); } }); - const crawler = createBaseCrawler({ queues: queues, locker: locker }); - const requestBox = []; - return crawler._getRequest(requestBox, { name: 'test' }).then(request => { - expect(request.type).to.be.equal('priority'); - expect(request._originQueue === priority).to.be.true; - expect(request.lock).to.be.equal('locked'); - expect(request.meta.cid).to.be.not.null; - expect(request).to.be.equal(requestBox[0]); - }); - }); - - it('should get from the normal queue if no priority', () => { - const priority = createBaseQueue('priority', { pop: () => { return Q(null); } }); - const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } }); - const queues = createBaseQueues({ priority: priority, normal: normal }); - const locker = createBaseLocker({ lock: () => { return Q('locked'); } }); - const crawler = createBaseCrawler({ queues: queues, locker: locker }); - const requestBox = []; - return crawler._getRequest(requestBox, { name: 'test' }).then(request => { - expect(request.type).to.be.equal('normal'); - expect(request._originQueue === normal).to.be.true; - expect(request.lock).to.be.equal('locked'); - expect(request.meta.cid).to.be.not.null; - expect(request).to.be.equal(requestBox[0]); - }); - }); - it('should return a dummy skip/delay request if none are queued', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(null); } }); const normal = createBaseQueue('normal', { pop: () => { return Q(null); } }); @@ -341,24 +309,24 @@ describe('Crawler requeue', () => { it('should requeue in deadletter queue after 5 attempts', () => { let queue = []; - const deadletterQueue = []; const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } }); - const deadletter = createBaseQueue('deadletter', { push: request => { deadletterQueue.push(request); return Q(); } }); - const queues = createBaseQueues({ normal: normal, deadletter: deadletter }); + const queues = createBaseQueues({ normal: normal }); + const deadletterStorage = []; + const deadletters = createDeadletterStore({ upsert: document => { deadletterStorage.push(document); return Q(); } }); const request = new Request('test', 'http://api.github.com/repo/microsoft/test'); request.attemptCount = 5; request.markRequeue(); request._retryQueue = 'normal'; - const crawler = createBaseCrawler({ queues: queues }); + const crawler = createBaseCrawler({ queues: queues, deadletters: deadletters }); request.crawler = crawler; return crawler._requeue(request).then(() => { queue = [].concat.apply([], queue); expect(queue.length).to.be.equal(0); - expect(deadletterQueue.length).to.be.equal(1); - expect(deadletterQueue[0] !== request).to.be.true; - expect(deadletterQueue[0].type === request.type).to.be.true; - expect(deadletterQueue[0].url === request.url).to.be.true; - expect(deadletterQueue[0].attemptCount).to.be.equal(6); + expect(deadletterStorage.length).to.be.equal(1); + expect(deadletterStorage[0] !== request).to.be.true; + expect(deadletterStorage[0].type === request.type).to.be.true; + expect(deadletterStorage[0].url === request.url).to.be.true; + expect(deadletterStorage[0].attemptCount).to.be.equal(6); }); }); }); @@ -764,7 +732,6 @@ describe('Crawler whole meal deal', () => { it('should process normal requests', () => { const crawler = createFullCrawler(); const normal = crawler.queues.queueTable['normal']; - const priority = crawler.queues.queueTable['priority']; const request = new Request('user', 'http://test.com/users/user1'); request.policy = TraversalPolicy.reload('user'); @@ -772,8 +739,7 @@ describe('Crawler whole meal deal', () => { crawler.fetcher.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })]; return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then( () => { - expect(priority.pop.callCount).to.be.equal(1, 'priority call count'); - expect(normal.pop.callCount).to.be.equal(1, 'normal call count'); + expect(normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; expect(lock.callCount).to.be.equal(1, 'lock call count'); @@ -804,13 +770,8 @@ describe('Crawler whole meal deal', () => { error => assert.fail()); }); - it('should empty request queues', () => { - // TODO - }); - it('should handle getRequest reject', () => { const crawler = createFullCrawler(); - const priority = crawler.queues.queueTable['priority']; // setup a problem popping const normal = createBaseQueue('normal'); @@ -824,7 +785,6 @@ describe('Crawler whole meal deal', () => { crawler.fetcher.responses = [createResponse(null, 500)]; return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then( () => { - expect(priority.pop.callCount).to.be.equal(1); expect(normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; @@ -857,7 +817,6 @@ describe('Crawler whole meal deal', () => { it('should handle fetch reject', () => { const crawler = createFullCrawler(); const normal = crawler.queues.queueTable['normal']; - const priority = crawler.queues.queueTable['priority']; // setup a good request but a server error response normal.requests = [new Request('user', 'http://test.com/users/user1')]; @@ -866,7 +825,6 @@ describe('Crawler whole meal deal', () => { return crawler.processOne({ name: 'test' }); }).then( () => { - expect(priority.pop.callCount).to.be.equal(1); expect(normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; @@ -897,14 +855,12 @@ describe('Crawler whole meal deal', () => { it('should handle process document reject', () => { const crawler = createFullCrawler(); crawler.processor = { process: () => { throw new Error('bad processor') } }; - const priority = crawler.queues.queueTable['priority']; const normal = crawler.queues.queueTable['normal']; normal.requests = [new Request('user', 'http://test.com/users/user1')]; crawler.fetcher.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })]; return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then( () => { - expect(priority.pop.callCount).to.be.equal(1); expect(normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; @@ -1035,7 +991,7 @@ function createFullCrawler() { return Q(request); }); - const GitHubProcessor = require('../lib/githubProcessor'); + const GitHubProcessor = require('../../providers/fetcher/githubProcessor'); const processor = new GitHubProcessor(); sinon.spy(processor, 'process'); @@ -1074,12 +1030,12 @@ function createErrorResponse(error) { }; } -function createBaseCrawler({ queues = createBaseQueues(), store = createBaseStore(), locker = createBaseLocker(), requestor = createBaseRequestor(), fetcher = null, options = createBaseOptions() } = {}) { +function createBaseCrawler({ queues = createBaseQueues(), store = createBaseStore(), deadletters = createDeadletterStore(), locker = createBaseLocker(), requestor = createBaseRequestor(), fetcher = null, options = createBaseOptions() } = {}) { if (!fetcher) { fetcher = createBaseFetcher(); } const processor = new GitHubProcessor(store); - return new Crawler(queues, store, locker, fetcher, processor, options.crawler); + return new Crawler(queues, store, deadletters, locker, fetcher, processor, options.crawler); } function createBaseOptions(logger = createBaseLog()) { @@ -1120,7 +1076,7 @@ function createBaseOptions(logger = createBaseLog()) { } function createBaseQueues({ priority = null, normal = null, deadletter = null, options = null } = {}) { - return new QueueSet([priority || createBaseQueue('priority'), normal || createBaseQueue('normal')], deadletter || createBaseQueue('deadletter'), (options || createBaseOptions()).queuing); + return new QueueSet([priority || createBaseQueue('priority'), normal || createBaseQueue('normal')], (options || createBaseOptions()).queuing); } function createBaseQueue(name, { pop = null, push = null, done = null, abandon = null } = {}) { @@ -1141,6 +1097,12 @@ function createBaseStore({ etag = null, upsert = null, get = null } = {}) { return result; } +function createDeadletterStore({ upsert = null } = {}) { + const result = {}; + result.upsert = upsert || (() => { assert.fail('should not upsert'); }); + return result; +} + function createBaseLog({ log = null, info = null, warn = null, error = null, verbose = null, silly = null } = {}) { const result = {}; result.log = log || (() => { }); diff --git a/test/unit/githubFetcherTests.js b/test/unit/githubFetcherTests.js index f8cdfbe..08db2a7 100644 --- a/test/unit/githubFetcherTests.js +++ b/test/unit/githubFetcherTests.js @@ -84,7 +84,7 @@ describe('GitHub fetcher', () => { expect(request.shouldRequeue()).to.be.false; expect(request.shouldSkip()).to.be.false; expect(fetcher.tokenFactory.exhaust.callCount).to.be.equal(1); - expect(fetcher.tokenFactory.exhaust.getCall(0).args[1]).to.be.approximately(resetTime * 1000, 20); + expect(fetcher.tokenFactory.exhaust.getCall(0).args[1]).to.be.approximately(resetTime * 1000 + 5000, 20); }); }); diff --git a/test/unit/queueSetTests.js b/test/unit/queueSetTests.js index 5c83ccb..541bf07 100644 --- a/test/unit/queueSetTests.js +++ b/test/unit/queueSetTests.js @@ -16,77 +16,27 @@ describe('QueueSet construction', () => { describe('QueueSet weighting', () => { it('should create a simple startMap', () => { - const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, createOptions([3, 2])); + const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], createOptions({ '1': 3, '2': 2 })); expect(set.startMap.length).to.be.equal(5); expect(set.startMap[0]).to.be.equal(0); + expect(set.startMap[1]).to.be.equal(0); expect(set.startMap[2]).to.be.equal(0); expect(set.startMap[3]).to.be.equal(1); expect(set.startMap[4]).to.be.equal(1); }); it('should create a default startMap if no weights given', () => { - const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, { _config: { on: () => { } } }); - expect(set.startMap.length).to.be.equal(1); + const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], { _config: { on: () => { } } }); + expect(set.startMap.length).to.be.equal(2); expect(set.startMap[0]).to.be.equal(0); - }); - - it('should throw if too many weights are given', () => { - expect(() => new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, createOptions([3, 2, 1]))).to.throw(Error); + expect(set.startMap[1]).to.be.equal(1); }); it('should throw if no weights are given', () => { - expect(() => new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, [])).to.throw(Error); + expect(() => new QueueSet([createBaseQueue('1'), createBaseQueue('2')], {})).to.throw(Error); }); - it('should create a simple startMap', () => { - const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, createOptions([3, 2])); - expect(set.startMap.length).to.be.equal(5); - expect(set.startMap[0]).to.be.equal(0); - expect(set.startMap[2]).to.be.equal(0); - expect(set.startMap[3]).to.be.equal(1); - expect(set.startMap[4]).to.be.equal(1); - }); - - it('should pop from first with default weights', () => { - const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } }); - const normal = createBaseQueue('normal'); - const queues = createBaseQueues([priority, normal]); - - return Q.all([queues.pop(), queues.pop()]).spread((first, second) => { - expect(first.type).to.be.equal('priority'); - expect(first._originQueue === priority).to.be.true; - expect(second.type).to.be.equal('priority'); - expect(second._originQueue === priority).to.be.true; - }); - }); - - it('should pop in order when requests always available', () => { - const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } }); - const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } }); - const queues = createBaseQueues([priority, normal], null, [1, 1]); - - return Q.all([queues.pop(), queues.pop()]).spread((first, second) => { - expect(first.type).to.be.equal('priority'); - expect(first._originQueue === priority).to.be.true; - expect(second.type).to.be.equal('normal'); - expect(second._originQueue === normal).to.be.true; - }); - }); - - it('should pop from subsequent if previous queue is empty', () => { - const priority = createBaseQueue('priority', { pop: () => { return Q(null); } }); - const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } }); - const queues = createBaseQueues([priority, normal], null, [1, 1]); - - return Q.all([queues.pop(), queues.pop()]).spread((first, second) => { - expect(first.type).to.be.equal('normal'); - expect(first._originQueue === normal).to.be.true; - expect(second.type).to.be.equal('normal'); - expect(second._originQueue === normal).to.be.true; - }); - }); - - it('should pop earlier queue if starting later and nothing available', () => { + it('should pop other queue if nothing available', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } }); const normal = createBaseQueue('normal', { pop: () => { return Q(null); } }); const queues = createBaseQueues([priority, normal], null, [1, 1]); @@ -126,8 +76,7 @@ describe('QueueSet pushing', () => { it('should repush into the same queue', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, push: request => { return Q(); } }); - const normal = createBaseQueue('normal'); - const queues = createBaseQueues([priority, normal]); + const queues = createBaseQueues([priority]); sinon.spy(priority, 'push'); return queues.pop().then(request => { @@ -143,8 +92,7 @@ describe('QueueSet pushing', () => { describe('QueueSet originQueue management', () => { it('should call done and mark acked on done', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, done: request => { return Q(); } }); - const normal = createBaseQueue('normal'); - const queues = createBaseQueues([priority, normal]); + const queues = createBaseQueues([priority]); sinon.spy(priority, 'done'); return queues.pop().then(request => { @@ -158,8 +106,7 @@ describe('QueueSet originQueue management', () => { it('should call done and mark acked on abandon', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); } }); - const normal = createBaseQueue('normal'); - const queues = createBaseQueues([priority, normal]); + const queues = createBaseQueues([priority]); sinon.spy(priority, 'abandon'); return queues.pop().then(request => { @@ -173,8 +120,7 @@ describe('QueueSet originQueue management', () => { it('should not abandon twice', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); } }); - const normal = createBaseQueue('normal'); - const queues = createBaseQueues([priority, normal]); + const queues = createBaseQueues([priority]); sinon.spy(priority, 'abandon'); return queues.pop().then(request => { @@ -190,8 +136,7 @@ describe('QueueSet originQueue management', () => { it('should not done after abandon ', () => { const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); }, done: request => { return Q(); } }); - const normal = createBaseQueue('normal'); - const queues = createBaseQueues([priority, normal]); + const queues = createBaseQueues([priority]); sinon.spy(priority, 'abandon'); sinon.spy(priority, 'done'); @@ -209,35 +154,29 @@ describe('QueueSet originQueue management', () => { }); describe('QueueSet subscription management', () => { - it('should subscribe all, including deadletter', () => { + it('should subscribe all', () => { const priority = createBaseQueue('priority', { subscribe: () => { } }); const normal = createBaseQueue('normal', { subscribe: () => { } }); - const deadletter = createBaseQueue('deadletter', { subscribe: () => { } }); - const queues = createBaseQueues([priority, normal], deadletter); + const queues = createBaseQueues([priority, normal]); sinon.spy(priority, 'subscribe'); sinon.spy(normal, 'subscribe'); - sinon.spy(deadletter, 'subscribe'); return queues.subscribe().then(() => { expect(priority.subscribe.callCount).to.be.equal(1); expect(normal.subscribe.callCount).to.be.equal(1); - expect(deadletter.subscribe.callCount).to.be.equal(1); }); }); - it('should unsubscribe all, including deadletter', () => { + it('should unsubscribe all', () => { const priority = createBaseQueue('priority', { unsubscribe: () => { } }); const normal = createBaseQueue('normal', { unsubscribe: () => { } }); - const deadletter = createBaseQueue('deadletter', { unsubscribe: () => { } }); - const queues = createBaseQueues([priority, normal], deadletter); + const queues = createBaseQueues([priority, normal]); sinon.spy(priority, 'unsubscribe'); sinon.spy(normal, 'unsubscribe'); - sinon.spy(deadletter, 'unsubscribe'); return queues.unsubscribe().then(() => { expect(priority.unsubscribe.callCount).to.be.equal(1); expect(normal.unsubscribe.callCount).to.be.equal(1); - expect(deadletter.unsubscribe.callCount).to.be.equal(1); }); }); }); @@ -249,8 +188,8 @@ function createOptions(weights) { }; } -function createBaseQueues(queues, deadletter, weights = [1]) { - return new QueueSet(queues, deadletter || createBaseQueue('deadletter'), createOptions(weights)); +function createBaseQueues(queues, weights = null) { + return new QueueSet(queues, createOptions(weights)); } function createBaseQueue(name, { pop = null, push = null, done = null, abandon = null, subscribe = null, unsubscribe = null } = {}) {