From 18599eb7fa6d4bf6cf88d8815cfca4d31cf7eebb Mon Sep 17 00:00:00 2001 From: Jeff McAffer Date: Sat, 12 Nov 2016 18:36:14 -0800 Subject: [PATCH] improve error and promise handling --- lib/crawler.js | 107 ++++++++++++++++--------------- lib/request.js | 1 - test/crawlerTests.js | 146 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 177 insertions(+), 77 deletions(-) diff --git a/lib/crawler.js b/lib/crawler.js index 83ff126..8500a95 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -47,49 +47,68 @@ class Crawler { _getRequest(requestBox, name) { const self = this; - return this._pop(this.priorityQueue) - .then(this._pop.bind(this, this.normalQueue)) - .then(request => { - if (!request) { - request = new Request('wait', null); - request.markDelay(); - request.markSkip('Exhausted queue', `Waiting 1 second`); - } - request.start = Date.now(); - request.crawler = self; - request.crawlerName = name; - requestBox[0] = request; - return request; - }) - .then(this._acquireLock.bind(this)); + return Q().then(() => { + return this._pop(this.priorityQueue) + .then(this._pop.bind(this, this.normalQueue)) + .then(request => { + if (!request) { + request = new Request('wait', null); + request.markDelay(); + request.markSkip('Exhausted queue', `Waiting 1 second`); + } + request.start = Date.now(); + request.crawler = self; + request.crawlerName = name; + requestBox[0] = request; + request.promises = []; + return request; + }); + }) + .then(this._acquireLock.bind(this)); + } + + _pop(queue, request = null) { + return Q().then(() => { + return request ? request : queue.pop(); + }).then(result => { + if (result && !result.originQueue) { + result.originQueue = queue; + } + return result; + }); } _acquireLock(request) { - if (request.url && this.locker) { - return this.locker.lock(request.url, 5 * 60 * 1000).then((lock) => { + if (!request.url || !this.locker) { + return Q(request); + } + const self = this; + return Q().then(() => { + return self.locker.lock(request.url, 5 * 60 * 1000); + }).then( + lock => { request.lock = lock; return request; - }, error => { - return request.originQueue.abandon(request).finally(() => { - throw error; - }); + }, + error => { + return Q().then(() => { request.originQueue.abandon(request); }) + .finally(() => { throw error; }); }); - } - return request; } _releaseLock(request) { - const self = this; - if (request.lock && this.locker) { - return this.locker.unlock(request.lock).then( - () => { - return request; - }, error => { - self.logger.error(error); - return request; - }); + if (!request.url || !this.locker) { + return Q(request); } - return Q(request); + const self = this; + return Q().then(() => { + return this.locker.unlock(request.lock); + }).then( + () => request, + error => { + self.logger.error(error); + return request; + }); } _completeRequest(request) { @@ -117,16 +136,6 @@ class Crawler { } } - _pop(queue, request = null) { - const self = this; - return (request ? Q(request) : queue.pop()).then(result => { - if (result) { - result.originQueue = queue; - } - return result; - }); - } - _startNext(name, request) { const delay = request.shouldDelay() ? 1000 : 0; setTimeout(this.start.bind(this, name), delay); @@ -189,7 +198,7 @@ class Crawler { _convertToDocument(request) { if (request.shouldSkip()) { - return Q.resolve(request); + return Q(request); } // If the doc is an array, wrap it in an object to make storage more consistent (Mongo can't store arrays directly) @@ -203,12 +212,12 @@ class Crawler { fetchedAt: moment.utc().toISOString(), links: {} }; - return Q.resolve(request); + return Q(request); } _processDocument(request) { if (request.shouldSkip()) { - return Q.resolve(request); + return Q(request); } let handler = this.processor[request.type]; handler = handler || this[request.type]; @@ -217,13 +226,13 @@ class Crawler { } request.document = handler.call(this.processor, request); - return Q.resolve(request); + return Q(request); } _storeDocument(request) { // See if we should skip storing the document. Test request.store explicitly for false as it may just not be set. if (request.shouldSkip() || !this.store || !request.document || request.store === false) { - return Q.resolve(request); + return Q(request); } return this.store.upsert(request.document).then((upsert) => { @@ -234,7 +243,7 @@ class Crawler { _deleteFromQueue(request) { if (!request.message) { - return Q.resolve(request); + return Q(request); } return this.normalQueue.done(request).then(() => { return request; }); } diff --git a/lib/request.js b/lib/request.js index 08bee15..509393b 100644 --- a/lib/request.js +++ b/lib/request.js @@ -4,7 +4,6 @@ class Request { constructor(type, url) { this.type = type; this.url = url; - this.promises = []; } addMeta(data) { diff --git a/test/crawlerTests.js b/test/crawlerTests.js index 87414af..869e3ee 100644 --- a/test/crawlerTests.js +++ b/test/crawlerTests.js @@ -6,6 +6,111 @@ const extend = require('extend'); const Q = require('q'); const Request = require('../lib/request'); +describe('Crawler get request', () => { + it('should get from the priority queue first', () => { + const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } }); + const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } }); + const locker = createBaseLocker({ lock: () => { return Q('locked'); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then(request => { + expect(request.type).to.be.equal('priority'); + expect(request.originQueue === priority).to.be.true; + expect(request.lock).to.be.equal('locked'); + expect(request.crawlerName).to.be.equal('test'); + expect(request).to.be.equal(requestBox[0]); + }); + }); + + it('should get from the normal queue if no priority', () => { + const priority = createBaseQueue({ pop: () => { return Q(null); } }); + const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } }); + const locker = createBaseLocker({ lock: () => { return Q('locked'); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then(request => { + expect(request.type).to.be.equal('normal'); + expect(request.originQueue === normal).to.be.true; + expect(request.lock).to.be.equal('locked'); + expect(request.crawlerName).to.be.equal('test'); + expect(request).to.be.equal(requestBox[0]); + }); + }); + + it('should return a dummy skip/delay request if none are queued', () => { + const priority = createBaseQueue({ pop: () => { return Q(null); } }); + const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then(request => { + expect(request.type).to.be.equal('wait'); + expect(request.lock).to.be.undefined; + expect(request.shouldSkip()).to.be.true; + expect(request.flowControl).to.be.equal('delay'); + expect(request.crawlerName).to.be.equal('test'); + expect(request).to.be.equal(requestBox[0]); + }); + }); + + it('should throw when normal pop errors', () => { + const priority = createBaseQueue({ pop: () => { return Q(null); } }); + const normal = createBaseQueue({ pop: () => { throw new Error('normal test'); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then( + request => assert.fail(), + error => expect(error.message).to.be.equal('normal test') + ); + }); + + it('should throw when priority pop errors', () => { + const priority = createBaseQueue({ pop: () => { throw new Error('priority test'); } }); + const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then( + request => assert.fail(), + error => expect(error.message).to.be.equal('priority test') + ); + }); + + it('should throw when acquire lock errors', () => { + const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } }); + const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then( + request => assert.fail(), + error => expect(error.message).to.be.equal('locker error') + ); + }); + + it('should abandon the request when the lock cannot be acquired', () => { + const abandoned = []; + const priority = createBaseQueue({ + pop: () => { + return Q(new Request('priority', 'http://test')); + }, + abandon: request => { + abandoned.push(request); + return Q(); + } + }); + const normal = createBaseQueue({ pop: () => { return Q(null); } }); + const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } }); + const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker }); + const requestBox = []; + return crawler._getRequest(requestBox, 'test').then( + request => assert.fail(), + error => { + expect(error.message).to.be.equal('locker error'); + expect(abandoned.length).to.be.equal(1); + }); + }); + +}); + describe('Crawler fetch', () => { it('should skip skipped requests', () => { const request = new Request('foo', null); @@ -114,12 +219,9 @@ describe('Crawler fetch', () => { return Q().then(() => { return crawler._fetch(request); }).then( - request => { - assert.fail(); - }, - error => { - expect(error.message.startsWith('Code: 500')).to.be.true; - }); + request => assert.fail(), + error => expect(error.message.startsWith('Code: 500')).to.be.true + ); }); it('should throw for store etag errors', () => { @@ -129,12 +231,9 @@ describe('Crawler fetch', () => { return Q().then(() => { return crawler._fetch(request); }).then( - request => { - assert.fail(); - }, - error => { - expect(error.message).to.be.equal('test'); - }); + request => assert.fail(), + error => expect(error.message).to.be.equal('test') + ); }); it('should throw for requestor get errors', () => { @@ -147,12 +246,9 @@ describe('Crawler fetch', () => { return Q().then(() => { return crawler._fetch(request); }).then( - request => { - assert.fail(); - }, - error => { - expect(error.message).to.be.equal('test'); - }); + request => assert.fail(), + error => expect(error.message).to.be.equal('test') + ); }); it('should throw for store get errors', () => { @@ -167,14 +263,10 @@ describe('Crawler fetch', () => { return Q().then(() => { return crawler._fetch(request); }).then( - request => { - assert.fail(); - }, - error => { - expect(error.message).to.be.equal('test'); - }); + request => assert.fail(), + error => expect(error.message).to.be.equal('test') + ); }); - }); function createResponse(body, code = 200, etag = null) { @@ -223,8 +315,8 @@ function createLinkHeader(target, previous, next, last) { return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(','); } -function createBaseCrawler({normalQueue = createBaseQueue(), priorityQueue = createBaseQueue(), deadLetterQueue = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = null, winston = createBaseLog() } = {}) { - return new Crawler(normalQueue, priorityQueue, deadLetterQueue, store, locker, requestor, options, winston); +function createBaseCrawler({normal = createBaseQueue(), priority = createBaseQueue(), deadLetter = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = null, winston = createBaseLog() } = {}) { + return new Crawler(normal, priority, deadLetter, store, locker, requestor, options, winston); } function createBaseQueue({ pop = null, push = null, done = null, abandon = null} = {}) {