From 5c390588871b1f568ba0de0b2f0a539c84ccb3a0 Mon Sep 17 00:00:00 2001 From: Jeff McAffer Date: Thu, 17 Nov 2016 15:33:44 -0800 Subject: [PATCH] collapse queues into a queueset --- index.js | 7 +- lib/crawler.js | 96 ++++++++++++--------- lib/processor.js | 6 +- lib/queueSet.js | 62 ++++++++++++++ lib/request.js | 24 ++++-- test/crawlerTests.js | 195 ++++++++++++++++++++++++------------------- 6 files changed, 252 insertions(+), 138 deletions(-) create mode 100644 lib/queueSet.js diff --git a/index.js b/index.js index c42e4e0..0b3b068 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,6 @@ -module.exports.eventFinder = require('./lib/eventFinder'); -module.exports.webhookDriver = require('./lib/webhookDriver'); module.exports.crawler = require('./lib/crawler'); -module.exports.request = require('./lib/request'); +module.exports.eventFinder = require('./lib/eventFinder'); module.exports.processor = require('./lib/processor'); +module.exports.queueSet = require('./lib/queueSet'); +module.exports.request = require('./lib/request'); +module.exports.webhookDriver = require('./lib/webhookDriver'); diff --git a/lib/crawler.js b/lib/crawler.js index dafef6f..e32e5b7 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -6,10 +6,8 @@ const Request = require('./request'); const URL = require('url'); class Crawler { - constructor(queue, priorityQueue, deadletterQueue, store, locker, requestor, config, logger) { - this.normalQueue = queue; - this.priorityQueue = priorityQueue; - this.deadletterQueue = deadletterQueue; + constructor(queues, store, locker, requestor, config, logger) { + this.queues = queues; this.store = store; this.locker = locker; this.requestor = requestor; @@ -48,9 +46,7 @@ class Crawler { _getRequest(requestBox, name) { const self = this; - return Q() - .then(self.log(self._pop.bind(self, self.priorityQueue))) - .then(self.log(self._pop.bind(self, self.normalQueue))) + return this.log(this.queues.pop()) .then(request => { if (!request) { request = new Request('_blank', null); @@ -67,24 +63,13 @@ class Crawler { .then(self.log(self._acquireLock.bind(self))); } - _pop(queue, request = null) { - return Q.try(() => { - return request ? request : queue.pop(); - }).then(result => { - if (result && !result.originQueue) { - result.originQueue = queue; - } - return result; - }); - } - _acquireLock(request) { if (!request.url || !this.locker) { return Q(request); } const self = this; return Q.try(() => { - return this.log(self.locker.lock(request.url, 5 * 60 * 1000), 'lock'); + return this.log(self.locker.lock(request.url, 1 * 60 * 1000), 'lock'); }).then( lock => { request.lock = lock; @@ -94,8 +79,15 @@ class Crawler { // If we could not acquire a lock, abandon the request so it will be returned to the queue. // If that fails, throw the original error return Q.try(() => { - this.log(request.originQueue.abandon(request), 'abandon'); - }).finally(() => { throw error; }); + this.log(this.queues.abandon(request), 'abandon'); + }).finally(() => { + // don't throw if it is the normal Exceeded scenario. It is not an "error", just someone else is processing. + if (error.message.startsWith('Exceeded')) { + request.markRequeue('Requeued', `Could not lock ${request.url}`); + return request; + } + throw error; + }); }); } @@ -135,11 +127,11 @@ class Crawler { request.attemptCount = request.attemptCount || 0; if (++request.attemptCount > 5) { this.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`); - this.queue(request, request, this.deadletterQueue); + request.track(this._queueDead(request, request)); } else { request.addMeta({ attempt: request.attemptCount }); this.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`); - this.queue(request, request, request.originQueue); + request.track(this._requeueOrigin(request)); } } @@ -149,6 +141,9 @@ class Crawler { const delayGate = request.nextRequestTime || now; const nextRequestTime = Math.max(requestGate, delayGate, now); const delay = Math.max(0, nextRequestTime - now); + if (delay) { + this.logger.verbose(`Crawler: ${name} waiting for ${delay}ms`); + } setTimeout(this.start.bind(this, name), delay); } @@ -202,6 +197,10 @@ class Crawler { return request.markSkip('Unmodified'); } return self.store.get(fetchType, request.url).then(document => { + // if the doc had stored headers (e.g., page responses) then reconstitute them for processing + if (document._metadata.headers) { + Object.assign(githubResponse.headers, document._metadata.headers); + } request.document = document; request.response = githubResponse; // Our store is up to date so don't store @@ -221,16 +220,21 @@ class Crawler { return Q(request); } - // If the doc is an array, wrap it in an object to make storage more consistent (Mongo can't store arrays directly) + // If the doc is an array, + // * wrap it in an object to make storage more consistent (Mongo can't store arrays directly) + // * save the link header as GitHub will not return those in a subsequent 304 + let headers = {}; if (Array.isArray(request.document)) { request.document = { elements: request.document }; + headers = { link: request.response.headers.link }; } request.document._metadata = { type: request.type, url: request.url, etag: request.response.headers.etag, fetchedAt: moment.utc().toISOString(), - links: {} + links: {}, + headers: headers }; return Q(request); } @@ -256,7 +260,7 @@ class Crawler { } _deleteFromQueue(request) { - return request.originQueue.done(request).then(() => request); + return this.queues.done(request).then(() => { return request; }); } _logOutcome(request) { @@ -266,7 +270,7 @@ class Crawler { error.request = request; this.logger.error(error); } else { - request.addMeta({ total: Date.now() - request.start }); + request.addMeta({ time: Date.now() - request.start }); this.logger.info(`${outcome} ${request.type} [${request.url}] ${request.message || ''}`, request.meta); } return request; @@ -274,22 +278,33 @@ class Crawler { // =============== Helpers ============ - queue(request, newRequest, queue = null) { - if (!this._shouldInclude(newRequest.type, newRequest.url)) { - this.logger.verbose(`Filtered ${newRequest.type} [${newRequest.url}]`); - return; + _requeueOrigin(request) { + const queuable = this._createQueuable(request); + return this.queues.repush(request, queuable); + } + + _queueDead(request) { + const queuable = this._createQueuable(request); + return this.queues.pushDead(queuable); + } + + queue(request, priority = false) { + if (!this._shouldInclude(request.type, request.url)) { + this.logger.verbose(`Filtered ${request.type} [${request.url}]`); + return []; } + const queuable = this._createQueuable(request); + return priority ? this.queues.pushPriority(queuable) : this.queues.push(queuable); + } + _createQueuable(request) { // Create a new request data structure that has just the things we should queue - const queuable = new Request(newRequest.type, newRequest.url); - queuable.attemptCount = newRequest.attemptCount; - queuable.context = newRequest.context; - queuable.force = newRequest.force; - queuable.subType = newRequest.subType; - - queue = queue || this.normalQueue; - request.promises.push(queue.push(queuable)); - return request; + const queuable = new Request(request.type, request.url); + queuable.attemptCount = request.attemptCount; + queuable.context = request.context; + queuable.force = request.force; + queuable.subType = request.subType; + return queuable; } _shouldInclude(type, target) { @@ -314,6 +329,7 @@ class Crawler { // If we hit the low water mark for requests, proactively sleep until the next ratelimit reset // This code is not designed to handle the 403 scenarios. That is handled by the retry logic. const remaining = parseInt(response.headers['x-ratelimit-remaining']) || 0; + request.addMeta({ remaining: remaining }); const tokenLowerBound = this.config ? (this.config.tokenLowerBound || 50) : 50; if (remaining < tokenLowerBound) { const reset = parseInt(response.headers['x-ratelimit-reset']) || 0; diff --git a/lib/processor.js b/lib/processor.js index fe86b29..7757ad2 100644 --- a/lib/processor.js +++ b/lib/processor.js @@ -26,14 +26,18 @@ class Processor { const linkHeader = request.response.headers.link; if (linkHeader) { const links = parse(linkHeader); + const requests = []; for (let i = 2; i <= links.last.page; i++) { const url = request.url + `?page=${i}&per_page=100`; const newRequest = new Request('page', url); newRequest.force = request.force; newRequest.subType = request.subType; newRequest.context = { qualifier: request.context.qualifier }; - request.crawler.queue(request, newRequest, request.crawler.priorityQueue); + requests.push(newRequest); } + // TODO this is a bit reachy. need a better way to efficiently queue up + // requests that we know are good. + request.track(request.crawler.queues.pushPriority(requests, true)); } // Rewrite the request and document to be a 'page' and then process. diff --git a/lib/queueSet.js b/lib/queueSet.js new file mode 100644 index 0000000..b10c25a --- /dev/null +++ b/lib/queueSet.js @@ -0,0 +1,62 @@ +const Q = require('q'); +const qlimit = require('qlimit'); + +class QueueSet { + constructor(priority, normal, deadletter) { + this.priority = priority; + this.normal = normal; + this.deadletter = deadletter; + this.allQueues = [priority,normal, deadletter]; + } + + pushPriority(requests) { + return this.push(requests, this.priority); + } + + push(requests, queue = this.normal) { + return queue.push(requests); + } + + pushDead(requests) { + return this.push(requests, this.deadletter); + } + + repush(original, newRequest) { + return this.push(newRequest, original._originQueue); + } + + subscribe() { + return Q.all(this.allQueues.map(queue => { return queue.subscribe(); })); + } + + unsubscribe() { + return Q.all(this.allQueues.map(queue => { return queue.unsubscribe(); })); + } + + pop() { + return Q() + .then(this._pop.bind(this, this.priority)) + .then(this._pop.bind(this, this.normal)); + } + + _pop(queue, request = null) { + return Q.try(() => { + return request ? request : queue.pop(); + }).then(result => { + if (result && !result._originQueue) { + result._originQueue = queue; + } + return result; + }); + } + + done(request) { + return request._originQueue ? request._originQueue.done(request) : Q(); + } + + abandon(request) { + return request._originQueue ? request._originQueue.abandon(request) : Q(); + } +} + +module.exports = QueueSet; \ No newline at end of file diff --git a/lib/request.js b/lib/request.js index 8d44621..833cc6a 100644 --- a/lib/request.js +++ b/lib/request.js @@ -4,6 +4,18 @@ class Request { constructor(type, url) { this.type = type; this.url = url; + this.promises = []; + } + + track(promises) { + if (!promises) { + return; + } + if (Array.isArray(promises)) { + Array.prototype.push.apply(this.promises, promises); + } else { + this.promises.push(promises); + } } addMeta(data) { @@ -36,17 +48,17 @@ class Request { links[name] = { type: 'siblings', href: href }; } - queue(type, url, context, queue = null) { + queue(type, url, context) { const newRequest = new Request(type, url); newRequest.context = context; - this.crawler.queue(this, newRequest, queue); + this.track(this.crawler.queue(newRequest)); } queueRoot(type, url, force = false) { const newRequest = new Request(type, url); newRequest.context = { qualifier: 'urn:' }; newRequest.force = force; - this.crawler.queue(this, newRequest); + this.track(this.crawler.queue(newRequest)); } queueRoots(type, url, force = false) { @@ -55,7 +67,7 @@ class Request { newContext.qualifier = this.document._metadata.links.self.href; newRequest.context = newContext; newRequest.force = force; - this.crawler.queue(this, newRequest); + this.track(this.crawler.queue(newRequest)); } queueChild(type, url, qualifier) { @@ -63,7 +75,7 @@ class Request { newRequest.context = this.context || {}; newRequest.context.qualifier = qualifier; newRequest.force = this.force; - this.crawler.queue(this, newRequest); + this.track(this.crawler.queue(newRequest)); } queueChildren(type, url, context = null) { @@ -72,7 +84,7 @@ class Request { newContext.qualifier = this.document._metadata.links.self.href; newRequest.context = newContext; newRequest.force = this.force; - this.crawler.queue(this, newRequest); + this.track(this.crawler.queue(newRequest)); } markSkip(outcome, message) { diff --git a/test/crawlerTests.js b/test/crawlerTests.js index 92345af..c414758 100644 --- a/test/crawlerTests.js +++ b/test/crawlerTests.js @@ -4,6 +4,7 @@ const Crawler = require('../lib/crawler'); const expect = require('chai').expect; const extend = require('extend'); const Q = require('q'); +const QueueSet = require('../lib/queueSet'); const Request = require('../lib/request'); const sinon = require('sinon'); @@ -11,12 +12,13 @@ describe('Crawler get request', () => { it('should get from the priority queue first', () => { const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } }); const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } }); + const queues = createBaseQueues({ priority: priority, normal: normal }); const locker = createBaseLocker({ lock: () => { return Q('locked'); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then(request => { expect(request.type).to.be.equal('priority'); - expect(request.originQueue === priority).to.be.true; + expect(request._originQueue === queues.priority).to.be.true; expect(request.lock).to.be.equal('locked'); expect(request.crawlerName).to.be.equal('test'); expect(request).to.be.equal(requestBox[0]); @@ -26,12 +28,13 @@ describe('Crawler get request', () => { it('should get from the normal queue if no priority', () => { const priority = createBaseQueue({ pop: () => { return Q(null); } }); const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } }); + const queues = createBaseQueues({ priority: priority, normal: normal }); const locker = createBaseLocker({ lock: () => { return Q('locked'); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then(request => { expect(request.type).to.be.equal('normal'); - expect(request.originQueue === normal).to.be.true; + expect(request._originQueue === queues.normal).to.be.true; expect(request.lock).to.be.equal('locked'); expect(request.crawlerName).to.be.equal('test'); expect(request).to.be.equal(requestBox[0]); @@ -41,7 +44,8 @@ describe('Crawler get request', () => { it('should return a dummy skip/delay request if none are queued', () => { const priority = createBaseQueue({ pop: () => { return Q(null); } }); const normal = createBaseQueue({ pop: () => { return Q(null); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority }); + const queues = createBaseQueues({ priority: priority, normal: normal }); + const crawler = createBaseCrawler({ queues: queues }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then(request => { expect(request.type).to.be.equal('_blank'); @@ -56,7 +60,8 @@ describe('Crawler get request', () => { it('should throw when normal pop errors', () => { const priority = createBaseQueue({ pop: () => { return Q(null); } }); const normal = createBaseQueue({ pop: () => { throw new Error('normal test'); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority }); + const queues = createBaseQueues({ priority: priority, normal: normal }); + const crawler = createBaseCrawler({ queues: queues }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then( request => assert.fail(), @@ -67,7 +72,8 @@ describe('Crawler get request', () => { it('should throw when priority pop errors', () => { const priority = createBaseQueue({ pop: () => { throw new Error('priority test'); } }); const normal = createBaseQueue({ pop: () => { return Q(null); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority }); + const queues = createBaseQueues({ priority: priority, normal: normal }); + const crawler = createBaseCrawler({ queues: queues }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then( request => assert.fail(), @@ -78,8 +84,9 @@ describe('Crawler get request', () => { it('should throw when acquire lock errors', () => { const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } }); const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const queues = createBaseQueues({ priority: priority, normal: normal }); const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then( request => assert.fail(), @@ -97,8 +104,9 @@ describe('Crawler get request', () => { } }); const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const queues = createBaseQueues({ priority: priority, normal: normal }); const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then( request => assert.fail(), @@ -115,8 +123,9 @@ describe('Crawler get request', () => { abandon: request => { throw new Error('Abandon error'); } }); const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const queues = createBaseQueues({ priority: priority, normal: normal }); const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } }); - const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); const requestBox = []; return crawler._getRequest(requestBox, 'test').then( request => assert.fail(), @@ -447,10 +456,10 @@ describe('Crawler queue', () => { const config = { orgFilter: new Set(['test']) }; const queue = []; const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } }); - const newRequest = new Request('repo', 'http://api.github.com/repo/microsoft/test'); - request = { promises: [] }; - const crawler = createBaseCrawler({ normal: normal, options: config }); - crawler.queue(request, newRequest); + const queues = createBaseQueues({ normal: normal }); + const request = new Request('repo', 'http://api.github.com/repo/microsoft/test'); + const crawler = createBaseCrawler({ queues: queues, options: config }); + crawler.queue(request); expect(request.promises.length).to.be.equal(0); expect(queue.length).to.be.equal(0); }); @@ -459,30 +468,31 @@ describe('Crawler queue', () => { const config = { orgFilter: new Set(['microsoft']) }; const queue = []; const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } }); - const newRequest = new Request('repo', 'http://api.github.com/repo/microsoft/test'); - request = { promises: [] }; - const crawler = createBaseCrawler({ normal: normal, options: config }); - crawler.queue(request, newRequest); + const queues = createBaseQueues({ normal: normal }); + const request = new Request('repo', 'http://api.github.com/repo/microsoft/test'); + const crawler = createBaseCrawler({ queues: queues, options: config }); + request.track(crawler.queue(request)); expect(request.promises.length).to.be.equal(1); expect(queue.length).to.be.equal(1); - expect(queue[0] !== newRequest).to.be.true; - expect(queue[0].type === newRequest.type).to.be.true; - expect(queue[0].url === newRequest.url).to.be.true; + expect(queue[0] !== request).to.be.true; + expect(queue[0].type === request.type).to.be.true; + expect(queue[0].url === request.url).to.be.true; }); + // TODO it('should queue in supplied queue', () => { const config = { orgFilter: new Set(['microsoft']) }; const queue = []; - const supplied = createBaseQueue({ push: request => { queue.push(request); return Q(); } }); - const newRequest = new Request('repo', 'http://api.github.com/repo/microsoft/test'); - request = { promises: [] }; - const crawler = createBaseCrawler({ options: config }); - crawler.queue(request, newRequest, supplied); + const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal }); + const request = new Request('repo', 'http://api.github.com/repo/microsoft/test'); + const crawler = createBaseCrawler({ queues: queues, options: config }); + request.track(crawler.queue(request)); expect(request.promises.length).to.be.equal(1); expect(queue.length).to.be.equal(1); - expect(queue[0] !== newRequest).to.be.true; - expect(queue[0].type === newRequest.type).to.be.true; - expect(queue[0].url === newRequest.url).to.be.true; + expect(queue[0] !== request).to.be.true; + expect(queue[0].type === request.type).to.be.true; + expect(queue[0].url === request.url).to.be.true; }); }); @@ -497,12 +507,12 @@ describe('Crawler requeue', () => { it('should requeue in same queue as before', () => { const queue = []; const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } }); - const crawler = createBaseCrawler({ normal: normal }); + const queues = createBaseQueues({ normal: normal }); + const crawler = createBaseCrawler({ queues: queues }); for (let i = 0; i < 5; i++) { const request = new Request('test', 'http://api.github.com/repo/microsoft/test'); request.markRequeue(); - request.promises = []; - request.originQueue = normal; + request._originQueue = normal; request.attemptCount = i === 0 ? null : i; crawler._requeue(request); expect(request.promises.length).to.be.equal(1); @@ -518,23 +528,23 @@ describe('Crawler requeue', () => { it('should requeue in deadletter queue after 5 attempts', () => { const queue = []; - const deadLetterQueue = []; + const deadletterQueue = []; const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } }); - const deadLetter = createBaseQueue({ push: request => { deadLetterQueue.push(request); return Q(); } }); + const deadletter = createBaseQueue({ push: request => { deadletterQueue.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal, deadletter: deadletter }); const request = new Request('test', 'http://api.github.com/repo/microsoft/test'); - request.promises = []; request.attemptCount = 5; request.markRequeue(); - request.originQueue = normal; - const crawler = createBaseCrawler({ normal: normal, deadLetter: deadLetter }); + request._originQueue = normal; + const crawler = createBaseCrawler({ queues: queues }); crawler._requeue(request); expect(request.promises.length).to.be.equal(1); expect(queue.length).to.be.equal(0); - expect(deadLetterQueue.length).to.be.equal(1); - expect(deadLetterQueue[0] !== request).to.be.true; - expect(deadLetterQueue[0].type === request.type).to.be.true; - expect(deadLetterQueue[0].url === request.url).to.be.true; - expect(deadLetterQueue[0].attemptCount).to.be.equal(6); + expect(deadletterQueue.length).to.be.equal(1); + expect(deadletterQueue[0] !== request).to.be.true; + expect(deadletterQueue[0].type === request.type).to.be.true; + expect(deadletterQueue[0].url === request.url).to.be.true; + expect(deadletterQueue[0].attemptCount).to.be.equal(6); }); }); @@ -543,12 +553,12 @@ describe('Crawler complete request', () => { const done = []; const unlock = []; const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal }); const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } }); const originalRequest = new Request('test', 'http://test.com'); originalRequest.lock = 42; - originalRequest.originQueue = normal; - originalRequest.promises = []; - const crawler = createBaseCrawler({ normal: normal, locker: locker }); + originalRequest._originQueue = normal; + const crawler = createBaseCrawler({ queues: queues, locker: locker }); return crawler._completeRequest(originalRequest).then(request => { expect(request === originalRequest).to.be.true; expect(request.lock).to.be.null; @@ -567,13 +577,13 @@ describe('Crawler complete request', () => { push: request => { queue.push(request); return Q(); }, done: request => { done.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal }); const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } }); const originalRequest = new Request('test', 'http://test.com'); originalRequest.markRequeue(); originalRequest.lock = 42; - originalRequest.originQueue = normal; - originalRequest.promises = []; - const crawler = createBaseCrawler({ normal: normal, locker: locker }); + originalRequest._originQueue = normal; + const crawler = createBaseCrawler({ queues: queues, locker: locker }); return crawler._completeRequest(originalRequest).then(request => { expect(request === originalRequest).to.be.true; expect(request.lock).to.be.null; @@ -592,12 +602,12 @@ describe('Crawler complete request', () => { it('should do all right things for requests with no url', () => { const done = []; const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal }); const originalRequest = new Request('test', null); originalRequest.markRequeue(); originalRequest.lock = 42; - originalRequest.originQueue = normal; - originalRequest.promises = []; - const crawler = createBaseCrawler({ normal: normal }); + originalRequest._originQueue = normal; + const crawler = createBaseCrawler({ queues: queues }); return crawler._completeRequest(originalRequest).then(request => { expect(request === originalRequest).to.be.true; expect(done.length).to.be.equal(1); @@ -616,6 +626,7 @@ describe('Crawler complete request', () => { return Q(); } }); + const queues = createBaseQueues({ normal: normal }); const locker = createBaseLocker({ unlock: request => { if (!promiseValue[0]) assert.fail(); @@ -625,9 +636,9 @@ describe('Crawler complete request', () => { }); const originalRequest = new Request('test', 'http://test.com'); originalRequest.lock = 42; - originalRequest.originQueue = normal; + originalRequest._originQueue = normal; originalRequest.promises = [Q.delay(1).then(() => promiseValue[0] = 13)]; - const crawler = createBaseCrawler({ normal: normal, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); return crawler._completeRequest(originalRequest).then( request => { expect(request === originalRequest).to.be.true; @@ -645,12 +656,13 @@ describe('Crawler complete request', () => { const done = []; const unlock = []; const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal }); const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } }); const originalRequest = new Request('test', 'http://test.com'); originalRequest.lock = 42; - originalRequest.originQueue = normal; + originalRequest._originQueue = normal; originalRequest.promises = [Q.reject(13)]; - const crawler = createBaseCrawler({ normal: normal, locker: locker }); + const crawler = createBaseCrawler({ queues: queues, locker: locker }); return crawler._completeRequest(originalRequest).then( request => assert.fail(), error => { @@ -665,12 +677,12 @@ describe('Crawler complete request', () => { const done = []; const unlock = []; const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } }); + const queues = createBaseQueues({ normal: normal }); const locker = createBaseLocker({ unlock: () => { throw new Error('sigh'); } }); const originalRequest = new Request('test', 'http://test.com'); originalRequest.lock = 42; - originalRequest.originQueue = normal; - originalRequest.promises = []; - const crawler = createBaseCrawler({ normal: normal, locker: locker }); + originalRequest._originQueue = normal; + const crawler = createBaseCrawler({ queues: queues, locker: locker }); return crawler._completeRequest(originalRequest).then( request => { expect(request === originalRequest).to.be.true; @@ -686,12 +698,12 @@ describe('Crawler complete request', () => { const done = []; const unlock = []; const normal = createBaseQueue({ done: () => { throw new Error('sigh'); } }); + const queues = createBaseQueues({ normal: normal }); const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } }); const originalRequest = new Request('test', 'http://test.com'); originalRequest.lock = 42; - originalRequest.originQueue = normal; - originalRequest.promises = []; - const crawler = createBaseCrawler({ normal: normal, locker: locker }); + originalRequest._originQueue = normal; + const crawler = createBaseCrawler({ queues: queues, locker: locker }); return crawler._completeRequest(originalRequest).then( request => assert.fail(), error => { @@ -890,13 +902,13 @@ describe('Crawler whole meal deal', () => { const crawler = createFullCrawler(); sinon.stub(crawler, '_startNext', () => Q()); - crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')]; + crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')]; crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })]; return Q.try(() => { return crawler.start('test'); }).then(() => { - expect(crawler.priorityQueue.pop.callCount).to.be.equal(1, 'priority call count'); - expect(crawler.normalQueue.pop.callCount).to.be.equal(1, 'normal call count'); + expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count'); + expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count'); const lock = crawler.locker.lock; expect(lock.callCount).to.be.equal(1, 'lock call count'); @@ -925,13 +937,14 @@ describe('Crawler whole meal deal', () => { expect(unlock.callCount).to.be.equal(1); expect(unlock.getCall(0).args[0]).to.be.equal('lockToken'); - expect(crawler.normalQueue.done.callCount).to.be.equal(1); + expect(crawler.queues.normal.done.callCount).to.be.equal(1); expect(crawler.logger.error.callCount).to.be.equal(1); }); }); it('should empty request queues', () => { + // TODO }); it('should handle getRequest reject', () => { @@ -943,14 +956,14 @@ describe('Crawler whole meal deal', () => { sinon.stub(normal, 'pop', () => { throw Error('cant pop') }); sinon.stub(normal, 'push', request => { return Q(); }); sinon.spy(normal, 'done'); - crawler.normalQueue = normal; + crawler.queues.normal = normal; crawler.requestor.responses = [createResponse(null, 500)]; return Q.try(() => { return crawler.start('test'); }).then(() => { - expect(crawler.priorityQueue.pop.callCount).to.be.equal(1); - expect(crawler.normalQueue.pop.callCount).to.be.equal(1); + expect(crawler.queues.priority.pop.callCount).to.be.equal(1); + expect(crawler.queues.normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; expect(lock.callCount).to.be.equal(0); @@ -961,7 +974,7 @@ describe('Crawler whole meal deal', () => { const requestorGet = crawler.requestor.get; expect(requestorGet.callCount).to.be.equal(0); - const push = crawler.normalQueue.push; + const push = crawler.queues.normal.push; expect(push.callCount).to.be.equal(0); const upsert = crawler.store.upsert; @@ -970,7 +983,7 @@ describe('Crawler whole meal deal', () => { const unlock = crawler.locker.unlock; expect(unlock.callCount).to.be.equal(0); - expect(crawler.normalQueue.done.callCount).to.be.equal(0); + expect(crawler.queues.normal.done.callCount).to.be.equal(0); expect(crawler.logger.error.callCount).to.be.equal(1); const error = crawler.logger.error.getCall(0).args[0]; @@ -983,13 +996,13 @@ describe('Crawler whole meal deal', () => { sinon.stub(crawler, '_startNext', () => Q()); // setup a good request but a server error response - crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')]; + crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')]; crawler.requestor.responses = [createResponse(null, 500)]; return Q.try(() => { return crawler.start('test'); }).then(() => { - expect(crawler.priorityQueue.pop.callCount).to.be.equal(1); - expect(crawler.normalQueue.pop.callCount).to.be.equal(1); + expect(crawler.queues.priority.pop.callCount).to.be.equal(1); + expect(crawler.queues.normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; expect(lock.callCount).to.be.equal(1); @@ -1004,7 +1017,7 @@ describe('Crawler whole meal deal', () => { expect(requestorGet.callCount).to.be.equal(1); expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1'); - const push = crawler.normalQueue.push; + const push = crawler.queues.normal.push; expect(push.callCount).to.be.equal(1); const newRequest = push.getCall(0).args[0]; expect(newRequest.type).to.be.equal('user'); @@ -1017,7 +1030,7 @@ describe('Crawler whole meal deal', () => { expect(unlock.callCount).to.be.equal(1); expect(unlock.getCall(0).args[0]).to.be.equal('lockToken'); - expect(crawler.normalQueue.done.callCount).to.be.equal(1); + expect(crawler.queues.normal.done.callCount).to.be.equal(1); expect(crawler.logger.error.callCount).to.be.equal(1); const error = crawler.logger.error.getCall(0).args[0]; @@ -1030,13 +1043,13 @@ describe('Crawler whole meal deal', () => { sinon.stub(crawler, '_startNext', () => Q()); crawler.processor = { process: () => { throw new Error('bad processor') } }; - crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')]; + crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')]; crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })]; return Q.try(() => { return crawler.start('test'); }).then(() => { - expect(crawler.priorityQueue.pop.callCount).to.be.equal(1); - expect(crawler.normalQueue.pop.callCount).to.be.equal(1); + expect(crawler.queues.priority.pop.callCount).to.be.equal(1); + expect(crawler.queues.normal.pop.callCount).to.be.equal(1); const lock = crawler.locker.lock; expect(lock.callCount).to.be.equal(1); @@ -1051,7 +1064,7 @@ describe('Crawler whole meal deal', () => { expect(requestorGet.callCount).to.be.equal(1); expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1'); - const push = crawler.normalQueue.push; + const push = crawler.queues.normal.push; expect(push.callCount).to.be.equal(1); const newRequest = push.getCall(0).args[0]; expect(newRequest.type).to.be.equal('user'); @@ -1064,7 +1077,7 @@ describe('Crawler whole meal deal', () => { expect(unlock.callCount).to.be.equal(1); expect(unlock.getCall(0).args[0]).to.be.equal('lockToken'); - expect(crawler.normalQueue.done.callCount).to.be.equal(1); + expect(crawler.queues.normal.done.callCount).to.be.equal(1); expect(crawler.logger.error.callCount).to.be.equal(1); const error = crawler.logger.error.getCall(0).args[0]; @@ -1077,7 +1090,7 @@ describe('Crawler whole meal deal', () => { sinon.stub(crawler, '_startNext', () => Q()); crawler.store = { upsert: () => { throw new Error('bad upsert') } }; - crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')]; + crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')]; crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })]; return Q.try(() => { return crawler.start('test'); @@ -1086,9 +1099,9 @@ describe('Crawler whole meal deal', () => { expect(unlock.callCount).to.be.equal(1); expect(unlock.getCall(0).args[0]).to.be.equal('lockToken'); - expect(crawler.normalQueue.done.callCount).to.be.equal(1); + expect(crawler.queues.normal.done.callCount).to.be.equal(1); - const push = crawler.normalQueue.push; + const push = crawler.queues.normal.push; expect(push.callCount).to.be.equal(1); const newRequest = push.getCall(0).args[0]; expect(newRequest.type).to.be.equal('user'); @@ -1105,12 +1118,12 @@ describe('Crawler whole meal deal', () => { sinon.stub(crawler, '_startNext', () => Q()); crawler.locker = { unlock: () => { throw new Error('bad unlock') } }; - crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')]; + crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')]; crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })]; return Q.try(() => { return crawler.start('test'); }).then(() => { - const push = crawler.normalQueue.push; + const push = crawler.queues.normal.push; expect(push.callCount).to.be.equal(1); const newRequest = push.getCall(0).args[0]; expect(newRequest.type).to.be.equal('user'); @@ -1134,6 +1147,8 @@ function createFullCrawler() { sinon.stub(normal, 'push', request => { return Q(); }); sinon.spy(normal, 'done'); + const queues = createBaseQueues({ priority: priority, normal: normal }); + const locker = createBaseLocker(); sinon.stub(locker, 'lock', request => { return Q('lockToken'); }); sinon.stub(locker, 'unlock', request => { return Q(); }); @@ -1158,7 +1173,7 @@ function createFullCrawler() { const config = []; - const result = createBaseCrawler({ normal: normal, priority: priority, requestor: requestor, store: store, logger: logger, locker: locker, options: config }); + const result = createBaseCrawler({ queues: queues, requestor: requestor, store: store, logger: logger, locker: locker, options: config }); result.processor = processor; return result; } @@ -1211,8 +1226,12 @@ function createLinkHeader(target, previous, next, last) { return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(','); } -function createBaseCrawler({normal = createBaseQueue(), priority = createBaseQueue(), deadLetter = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = { promiseTrace: false }, logger = createBaseLog() } = {}) { - return new Crawler(normal, priority, deadLetter, store, locker, requestor, options, logger); +function createBaseCrawler({queues = createBaseQueues(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = { promiseTrace: false }, logger = createBaseLog() } = {}) { + return new Crawler(queues, store, locker, requestor, options, logger); +} + +function createBaseQueues({ priority = null, normal = null, deadletter = null} = {}) { + return new QueueSet(priority || createBaseQueue(), normal || createBaseQueue(), deadletter || createBaseQueue()); } function createBaseQueue({ pop = null, push = null, done = null, abandon = null} = {}) {