Merge pull request #81 from Microsoft/jm/fixTest

Fix up tests broken during refactoring
This commit is contained in:
Gene Hazan 2017-04-07 13:22:16 -07:00 коммит произвёл GitHub
Родитель 2851389fa6 de09c0314e
Коммит 745635a9f0
7 изменённых файлов: 91 добавлений и 178 удалений

Просмотреть файл

@ -156,9 +156,10 @@ class Crawler {
_acquireLock(request) {
debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_acquireLock(${loopName}:${request.toUniqueString()}): enter`);
if (!request.url || !this.locker || request.requiresLock === false) {
debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_acquireLock(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return Q(request);
}
const self = this;
@ -166,14 +167,14 @@ class Crawler {
return this.trace(self.locker.lock(request.url, self.options.processingTtl || 60 * 1000), 'lock');
}).then(
lock => {
debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_acquireLock(${loopName}:${request.toUniqueString()}): exit (success)`);
request.lock = lock;
return request;
},
error => {
// If we could not acquire a lock, requeue. If the "error" is a normal Exceeded scenario, requeue normally
// noting that we could not get a lock. For any other error, requeue and capture the error for debugging.
debug(`_acquireLock(${request.meta.loopName}:${request.toUniqueString()}): exit (error)`);
debug(`_acquireLock(${loopName}:${request.toUniqueString()}): exit (error)`);
if (error.message.startsWith('Exceeded')) {
return request.markRequeue('Collision', 'Could not lock');
}
@ -183,9 +184,10 @@ class Crawler {
}
_releaseLock(request) {
debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_releaseLock(${loopName}:${request.toUniqueString()}): enter`);
if (!request.lock || !this.locker) {
debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_releaseLock(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return Q(request);
}
const self = this;
@ -193,12 +195,12 @@ class Crawler {
return this.locker.unlock(request.lock);
}).then(
() => {
debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_releaseLock(${loopName}:${request.toUniqueString()}): exit (success)`);
request.lock = null;
return request;
},
error => {
debug(`_releaseLock(${request.meta.loopName}:${request.toUniqueString()}): exit (error)`);
debug(`_releaseLock(${loopName}:${request.toUniqueString()}): exit (error)`);
request.lock = null;
self.logger.error(error);
return request;
@ -297,7 +299,8 @@ class Crawler {
}
_requeue(request) {
debug(`_requeue(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_requeue(${loopName}:${request.toUniqueString()}): enter`);
return Q.try(() => {
request.attemptCount = request.attemptCount || 0;
if (++request.attemptCount > 5) {
@ -307,34 +310,36 @@ class Crawler {
const queuable = request.createRequeuable();
return this.queues.repush(request, queuable);
}).then(result => {
debug(`_requeue(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_requeue(${loopName}:${request.toUniqueString()}): exit (success)`);
return result;
});
}
_filter(request) {
debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_filter(${loopName}:${request.toUniqueString()}): enter`);
if (request.shouldSkip()) {
debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_filter(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return request;
}
if (!request.type || !request.url) {
// park the malformed request in the dead queue for debugging and ignore the returned promise
debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (malformed)`);
debug(`_filter(${loopName}:${request.toUniqueString()}): exit (malformed)`);
return this._storeDeadletter(request, `Detected malformed request ${request.toString()}`);
}
if (this._shouldFilter(request)) {
debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (success - filtered)`);
debug(`_filter(${loopName}:${request.toUniqueString()}): exit (success - filtered)`);
request.markSkip('Declined');
}
debug(`_filter(${request.meta.loopName}:${request.toUniqueString()}): exit (success - not filtered)`);
debug(`_filter(${loopName}:${request.toUniqueString()}): exit (success - not filtered)`);
return request;
}
_fetch(request) {
debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_fetch(${loopName}:${request.toUniqueString()}): enter`);
if (request.shouldSkip()) {
debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_fetch(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return request;
}
if (request.payload) {
@ -345,21 +350,22 @@ class Crawler {
if (request.payload.fetchedAt) {
request.response.headers.fetchedAt = request.payload.fetchedAt;
}
debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): exit (success - payload)`);
debug(`_fetch(${loopName}:${request.toUniqueString()}): exit (success - payload)`);
return request;
}
return this._logStartEnd('fetching', request, () => {
return this.fetcher.fetch(request);
}).then(request => {
debug(`_fetch(${request.meta.loopName}:${request.toUniqueString()}): exit (success - fetched)`);
debug(`_fetch(${loopName}:${request.toUniqueString()}): exit (success - fetched)`);
return request;
});
}
_convertToDocument(request) {
debug(`_convertToDocument(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_convertToDocument(${loopName}:${request.toUniqueString()}): enter`);
if (request.shouldSkip()) {
debug(`_convertToDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_convertToDocument(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return Q(request);
}
@ -405,14 +411,15 @@ class Crawler {
if (typeof request.document === 'string')
console.log('got a string document');
request.document._metadata = metadata;
debug(`_convertToDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_convertToDocument(${loopName}:${request.toUniqueString()}): exit (success)`);
return Q(request);
}
_processDocument(request) {
debug(`_processDocument(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_processDocument(${loopName}:${request.toUniqueString()}): enter`);
if (request.shouldSkip()) {
debug(`_processDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_processDocument(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return Q(request);
}
// if the request is a delete, mark the document as deleted and be done
@ -423,7 +430,7 @@ class Crawler {
}
return this._logStartEnd('processing', request, () => {
request.document = this.processor.process(request);
debug(`_processDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_processDocument(${loopName}:${request.toUniqueString()}): exit (success)`);
return request;
});
}
@ -454,9 +461,10 @@ class Crawler {
}
_storeDocument(request) {
debug(`_storeDocument(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_storeDocument(${loopName}:${request.toUniqueString()}): enter`);
if (request.shouldSkip() || !request.shouldSave()) {
debug(`_storeDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (nothing to do)`);
debug(`_storeDocument(${loopName}:${request.toUniqueString()}): exit (nothing to do)`);
return Q(request);
}
@ -464,26 +472,28 @@ class Crawler {
return this.store.upsert(request.document).then(upsert => {
request.upsert = upsert;
request.addMeta({ write: Date.now() - start });
debug(`_storeDocument(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_storeDocument(${loopName}:${request.toUniqueString()}): exit (success)`);
return request;
});
}
_deleteFromQueue(request) {
debug(`_deleteFromQueue(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_deleteFromQueue(${loopName}:${request.toUniqueString()}): enter`);
return Q.try(() => {
return this.queues.done(request).then(() => {
debug(`_deleteFromQueue(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_deleteFromQueue(${loopName}:${request.toUniqueString()}): exit (success)`);
return request;
});
});
}
_abandonInQueue(request) {
debug(`_abandonInQueue(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_abandonInQueue(${loopName}:${request.toUniqueString()}): enter`);
return Q.try(() => {
return this.queues.abandon(request).then(() => {
debug(`_abandonInQueue(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_abandonInQueue(${loopName}:${request.toUniqueString()}): exit (success)`);
return request;
});
});
@ -509,10 +519,11 @@ class Crawler {
}
storeDeadletter(request, reason = null) {
debug(`_storeDeadletter(${request.meta.loopName}:${request.toUniqueString()}): enter`);
const loopName = request.meta ? request.meta.loopName : '';
debug(`_storeDeadletter(${loopName}:${request.toUniqueString()}): enter`);
const document = this._createDeadletter(request, reason);
return this.deadletters.upsert(document).then(() => {
debug(`_storeDeadletter(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`);
debug(`_storeDeadletter(${loopName}:${request.toUniqueString()}): exit (success)`);
return request;
});
}

Просмотреть файл

@ -118,7 +118,7 @@ class CrawlerFactory {
static createService(name) {
factoryLogger.info('appInitStart');
const crawlerName = config.get('CRAWLER_NAME') || 'crawler';
const optionsProvider = config.get('CRAWLER_OPTIONS_PROVIDER') || 'memory';
const optionsProvider = name === 'InMemory' ? 'memory' : (config.get('CRAWLER_OPTIONS_PROVIDER') || 'memory');
const subsystemNames = ['crawler', 'fetcher', 'queuing', 'storage', 'locker'];
const crawlerPromise = CrawlerFactory.createRefreshingOptions(crawlerName, subsystemNames, optionsProvider).then(options => {
factoryLogger.info(`creating refreshingOption completed`);
@ -150,6 +150,7 @@ class CrawlerFactory {
options.queuing.metricsStore = null;
options.locker.provider = 'memory';
options.storage.provider = 'memory';
options.storage.delta.provider = 'none';
return options;
}
@ -441,7 +442,7 @@ class CrawlerFactory {
}
static createDeltaStore(inner, options) {
if (!options.delta || !options.delta.provider) {
if (!options.delta || !options.delta.provider || options.delta.provider === 'none') {
return inner;
}
factoryLogger.info(`creating delta store`);

Просмотреть файл

@ -265,13 +265,12 @@ class GitHubFetcher {
_getTypeDetails(type) {
const result = {
orgs: { tokenTraits: ['admin'] },
org: { tokenTraits: [['admin'], ['public']], headers: { Accept: 'application/vnd.github.korra-preview+json'} },
repos: { tokenTraits: [['admin'], ['public']] },
repo: { tokenTraits: [['admin'], ['public']] },
org: { tokenTraits: [['admin'], ['public']] },
repos: { tokenTraits: [['admin'], ['public']], headers: { Accept: 'application/vnd.github.mercy-preview+json' } },
repo: { tokenTraits: [['admin'], ['public']], headers: { Accept: 'application/vnd.github.mercy-preview+json' } },
teams: { tokenTraits: ['admin'] },
team: { tokenTraits: ['admin'] },
members: { tokenTraits: ['admin'] },
// update_events: { tokenTraits: ['admin'] },
events: { tokenTraits: ['admin'] },
collaborators: { tokenTraits: ['admin'], headers: { Accept: 'application/vnd.github.korra-preview' } },
reviews: { headers: { Accept: 'application/vnd.github.black-cat-preview+json' } },

Просмотреть файл

@ -91,6 +91,7 @@ class QueueSet {
_createStartMap(weights) {
// Create a simple table of which queue to pop based on the weights supplied. For each queue,
// look up its weight and add that many entries in the map. If no weight is included, assume 1.
weights = weights || {};
const result = [];
for (let i = 0; i < this.queues.length; i++) {
const count = weights[this.queues[i].getName()] || 1;

Просмотреть файл

@ -15,38 +15,6 @@ const sinon = require('sinon');
const TraversalPolicy = require('../../lib/traversalPolicy');
describe('Crawler get request', () => {
it('should get from the priority queue first', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('priority');
expect(request._originQueue === priority).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.meta.cid).to.be.not.null;
expect(request).to.be.equal(requestBox[0]);
});
});
it('should get from the normal queue if no priority', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('normal');
expect(request._originQueue === normal).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.meta.cid).to.be.not.null;
expect(request).to.be.equal(requestBox[0]);
});
});
it('should return a dummy skip/delay request if none are queued', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
@ -341,24 +309,24 @@ describe('Crawler requeue', () => {
it('should requeue in deadletter queue after 5 attempts', () => {
let queue = [];
const deadletterQueue = [];
const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } });
const deadletter = createBaseQueue('deadletter', { push: request => { deadletterQueue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal, deadletter: deadletter });
const queues = createBaseQueues({ normal: normal });
const deadletterStorage = [];
const deadletters = createDeadletterStore({ upsert: document => { deadletterStorage.push(document); return Q(); } });
const request = new Request('test', 'http://api.github.com/repo/microsoft/test');
request.attemptCount = 5;
request.markRequeue();
request._retryQueue = 'normal';
const crawler = createBaseCrawler({ queues: queues });
const crawler = createBaseCrawler({ queues: queues, deadletters: deadletters });
request.crawler = crawler;
return crawler._requeue(request).then(() => {
queue = [].concat.apply([], queue);
expect(queue.length).to.be.equal(0);
expect(deadletterQueue.length).to.be.equal(1);
expect(deadletterQueue[0] !== request).to.be.true;
expect(deadletterQueue[0].type === request.type).to.be.true;
expect(deadletterQueue[0].url === request.url).to.be.true;
expect(deadletterQueue[0].attemptCount).to.be.equal(6);
expect(deadletterStorage.length).to.be.equal(1);
expect(deadletterStorage[0] !== request).to.be.true;
expect(deadletterStorage[0].type === request.type).to.be.true;
expect(deadletterStorage[0].url === request.url).to.be.true;
expect(deadletterStorage[0].attemptCount).to.be.equal(6);
});
});
});
@ -764,7 +732,6 @@ describe('Crawler whole meal deal', () => {
it('should process normal requests', () => {
const crawler = createFullCrawler();
const normal = crawler.queues.queueTable['normal'];
const priority = crawler.queues.queueTable['priority'];
const request = new Request('user', 'http://test.com/users/user1');
request.policy = TraversalPolicy.reload('user');
@ -772,8 +739,7 @@ describe('Crawler whole meal deal', () => {
crawler.fetcher.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(normal.pop.callCount).to.be.equal(1, 'normal call count');
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1, 'lock call count');
@ -804,13 +770,8 @@ describe('Crawler whole meal deal', () => {
error => assert.fail());
});
it('should empty request queues', () => {
// TODO
});
it('should handle getRequest reject', () => {
const crawler = createFullCrawler();
const priority = crawler.queues.queueTable['priority'];
// setup a problem popping
const normal = createBaseQueue('normal');
@ -824,7 +785,6 @@ describe('Crawler whole meal deal', () => {
crawler.fetcher.responses = [createResponse(null, 500)];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(priority.pop.callCount).to.be.equal(1);
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
@ -857,7 +817,6 @@ describe('Crawler whole meal deal', () => {
it('should handle fetch reject', () => {
const crawler = createFullCrawler();
const normal = crawler.queues.queueTable['normal'];
const priority = crawler.queues.queueTable['priority'];
// setup a good request but a server error response
normal.requests = [new Request('user', 'http://test.com/users/user1')];
@ -866,7 +825,6 @@ describe('Crawler whole meal deal', () => {
return crawler.processOne({ name: 'test' });
}).then(
() => {
expect(priority.pop.callCount).to.be.equal(1);
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
@ -897,14 +855,12 @@ describe('Crawler whole meal deal', () => {
it('should handle process document reject', () => {
const crawler = createFullCrawler();
crawler.processor = { process: () => { throw new Error('bad processor') } };
const priority = crawler.queues.queueTable['priority'];
const normal = crawler.queues.queueTable['normal'];
normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.fetcher.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(priority.pop.callCount).to.be.equal(1);
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
@ -1035,7 +991,7 @@ function createFullCrawler() {
return Q(request);
});
const GitHubProcessor = require('../lib/githubProcessor');
const GitHubProcessor = require('../../providers/fetcher/githubProcessor');
const processor = new GitHubProcessor();
sinon.spy(processor, 'process');
@ -1074,12 +1030,12 @@ function createErrorResponse(error) {
};
}
function createBaseCrawler({ queues = createBaseQueues(), store = createBaseStore(), locker = createBaseLocker(), requestor = createBaseRequestor(), fetcher = null, options = createBaseOptions() } = {}) {
function createBaseCrawler({ queues = createBaseQueues(), store = createBaseStore(), deadletters = createDeadletterStore(), locker = createBaseLocker(), requestor = createBaseRequestor(), fetcher = null, options = createBaseOptions() } = {}) {
if (!fetcher) {
fetcher = createBaseFetcher();
}
const processor = new GitHubProcessor(store);
return new Crawler(queues, store, locker, fetcher, processor, options.crawler);
return new Crawler(queues, store, deadletters, locker, fetcher, processor, options.crawler);
}
function createBaseOptions(logger = createBaseLog()) {
@ -1120,7 +1076,7 @@ function createBaseOptions(logger = createBaseLog()) {
}
function createBaseQueues({ priority = null, normal = null, deadletter = null, options = null } = {}) {
return new QueueSet([priority || createBaseQueue('priority'), normal || createBaseQueue('normal')], deadletter || createBaseQueue('deadletter'), (options || createBaseOptions()).queuing);
return new QueueSet([priority || createBaseQueue('priority'), normal || createBaseQueue('normal')], (options || createBaseOptions()).queuing);
}
function createBaseQueue(name, { pop = null, push = null, done = null, abandon = null } = {}) {
@ -1141,6 +1097,12 @@ function createBaseStore({ etag = null, upsert = null, get = null } = {}) {
return result;
}
function createDeadletterStore({ upsert = null } = {}) {
const result = {};
result.upsert = upsert || (() => { assert.fail('should not upsert'); });
return result;
}
function createBaseLog({ log = null, info = null, warn = null, error = null, verbose = null, silly = null } = {}) {
const result = {};
result.log = log || (() => { });

Просмотреть файл

@ -84,7 +84,7 @@ describe('GitHub fetcher', () => {
expect(request.shouldRequeue()).to.be.false;
expect(request.shouldSkip()).to.be.false;
expect(fetcher.tokenFactory.exhaust.callCount).to.be.equal(1);
expect(fetcher.tokenFactory.exhaust.getCall(0).args[1]).to.be.approximately(resetTime * 1000, 20);
expect(fetcher.tokenFactory.exhaust.getCall(0).args[1]).to.be.approximately(resetTime * 1000 + 5000, 20);
});
});

Просмотреть файл

@ -16,77 +16,27 @@ describe('QueueSet construction', () => {
describe('QueueSet weighting', () => {
it('should create a simple startMap', () => {
const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, createOptions([3, 2]));
const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], createOptions({ '1': 3, '2': 2 }));
expect(set.startMap.length).to.be.equal(5);
expect(set.startMap[0]).to.be.equal(0);
expect(set.startMap[1]).to.be.equal(0);
expect(set.startMap[2]).to.be.equal(0);
expect(set.startMap[3]).to.be.equal(1);
expect(set.startMap[4]).to.be.equal(1);
});
it('should create a default startMap if no weights given', () => {
const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, { _config: { on: () => { } } });
expect(set.startMap.length).to.be.equal(1);
const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], { _config: { on: () => { } } });
expect(set.startMap.length).to.be.equal(2);
expect(set.startMap[0]).to.be.equal(0);
});
it('should throw if too many weights are given', () => {
expect(() => new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, createOptions([3, 2, 1]))).to.throw(Error);
expect(set.startMap[1]).to.be.equal(1);
});
it('should throw if no weights are given', () => {
expect(() => new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, [])).to.throw(Error);
expect(() => new QueueSet([createBaseQueue('1'), createBaseQueue('2')], {})).to.throw(Error);
});
it('should create a simple startMap', () => {
const set = new QueueSet([createBaseQueue('1'), createBaseQueue('2')], null, createOptions([3, 2]));
expect(set.startMap.length).to.be.equal(5);
expect(set.startMap[0]).to.be.equal(0);
expect(set.startMap[2]).to.be.equal(0);
expect(set.startMap[3]).to.be.equal(1);
expect(set.startMap[4]).to.be.equal(1);
});
it('should pop from first with default weights', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('priority');
expect(first._originQueue === priority).to.be.true;
expect(second.type).to.be.equal('priority');
expect(second._originQueue === priority).to.be.true;
});
});
it('should pop in order when requests always available', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues([priority, normal], null, [1, 1]);
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('priority');
expect(first._originQueue === priority).to.be.true;
expect(second.type).to.be.equal('normal');
expect(second._originQueue === normal).to.be.true;
});
});
it('should pop from subsequent if previous queue is empty', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues([priority, normal], null, [1, 1]);
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('normal');
expect(first._originQueue === normal).to.be.true;
expect(second.type).to.be.equal('normal');
expect(second._originQueue === normal).to.be.true;
});
});
it('should pop earlier queue if starting later and nothing available', () => {
it('should pop other queue if nothing available', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues([priority, normal], null, [1, 1]);
@ -126,8 +76,7 @@ describe('QueueSet pushing', () => {
it('should repush into the same queue', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, push: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
const queues = createBaseQueues([priority]);
sinon.spy(priority, 'push');
return queues.pop().then(request => {
@ -143,8 +92,7 @@ describe('QueueSet pushing', () => {
describe('QueueSet originQueue management', () => {
it('should call done and mark acked on done', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, done: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
const queues = createBaseQueues([priority]);
sinon.spy(priority, 'done');
return queues.pop().then(request => {
@ -158,8 +106,7 @@ describe('QueueSet originQueue management', () => {
it('should call done and mark acked on abandon', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
const queues = createBaseQueues([priority]);
sinon.spy(priority, 'abandon');
return queues.pop().then(request => {
@ -173,8 +120,7 @@ describe('QueueSet originQueue management', () => {
it('should not abandon twice', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
const queues = createBaseQueues([priority]);
sinon.spy(priority, 'abandon');
return queues.pop().then(request => {
@ -190,8 +136,7 @@ describe('QueueSet originQueue management', () => {
it('should not done after abandon ', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); }, done: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
const queues = createBaseQueues([priority]);
sinon.spy(priority, 'abandon');
sinon.spy(priority, 'done');
@ -209,35 +154,29 @@ describe('QueueSet originQueue management', () => {
});
describe('QueueSet subscription management', () => {
it('should subscribe all, including deadletter', () => {
it('should subscribe all', () => {
const priority = createBaseQueue('priority', { subscribe: () => { } });
const normal = createBaseQueue('normal', { subscribe: () => { } });
const deadletter = createBaseQueue('deadletter', { subscribe: () => { } });
const queues = createBaseQueues([priority, normal], deadletter);
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'subscribe');
sinon.spy(normal, 'subscribe');
sinon.spy(deadletter, 'subscribe');
return queues.subscribe().then(() => {
expect(priority.subscribe.callCount).to.be.equal(1);
expect(normal.subscribe.callCount).to.be.equal(1);
expect(deadletter.subscribe.callCount).to.be.equal(1);
});
});
it('should unsubscribe all, including deadletter', () => {
it('should unsubscribe all', () => {
const priority = createBaseQueue('priority', { unsubscribe: () => { } });
const normal = createBaseQueue('normal', { unsubscribe: () => { } });
const deadletter = createBaseQueue('deadletter', { unsubscribe: () => { } });
const queues = createBaseQueues([priority, normal], deadletter);
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'unsubscribe');
sinon.spy(normal, 'unsubscribe');
sinon.spy(deadletter, 'unsubscribe');
return queues.unsubscribe().then(() => {
expect(priority.unsubscribe.callCount).to.be.equal(1);
expect(normal.unsubscribe.callCount).to.be.equal(1);
expect(deadletter.unsubscribe.callCount).to.be.equal(1);
});
});
});
@ -249,8 +188,8 @@ function createOptions(weights) {
};
}
function createBaseQueues(queues, deadletter, weights = [1]) {
return new QueueSet(queues, deadletter || createBaseQueue('deadletter'), createOptions(weights));
function createBaseQueues(queues, weights = null) {
return new QueueSet(queues, createOptions(weights));
}
function createBaseQueue(name, { pop = null, push = null, done = null, abandon = null, subscribe = null, unsubscribe = null } = {}) {