improve error and promise handling

This commit is contained in:
Jeff McAffer 2016-11-12 18:36:14 -08:00
Родитель 1924857ebf
Коммит 18599eb7fa
3 изменённых файлов: 177 добавлений и 77 удалений

Просмотреть файл

@ -47,6 +47,7 @@ class Crawler {
_getRequest(requestBox, name) {
const self = this;
return Q().then(() => {
return this._pop(this.priorityQueue)
.then(this._pop.bind(this, this.normalQueue))
.then(request => {
@ -59,38 +60,56 @@ class Crawler {
request.crawler = self;
request.crawlerName = name;
requestBox[0] = request;
request.promises = [];
return request;
});
})
.then(this._acquireLock.bind(this));
}
_acquireLock(request) {
if (request.url && this.locker) {
return this.locker.lock(request.url, 5 * 60 * 1000).then((lock) => {
request.lock = lock;
return request;
}, error => {
return request.originQueue.abandon(request).finally(() => {
throw error;
});
_pop(queue, request = null) {
return Q().then(() => {
return request ? request : queue.pop();
}).then(result => {
if (result && !result.originQueue) {
result.originQueue = queue;
}
return result;
});
}
_acquireLock(request) {
if (!request.url || !this.locker) {
return Q(request);
}
const self = this;
return Q().then(() => {
return self.locker.lock(request.url, 5 * 60 * 1000);
}).then(
lock => {
request.lock = lock;
return request;
},
error => {
return Q().then(() => { request.originQueue.abandon(request); })
.finally(() => { throw error; });
});
}
_releaseLock(request) {
if (!request.url || !this.locker) {
return Q(request);
}
const self = this;
if (request.lock && this.locker) {
return this.locker.unlock(request.lock).then(
() => {
return request;
}, error => {
return Q().then(() => {
return this.locker.unlock(request.lock);
}).then(
() => request,
error => {
self.logger.error(error);
return request;
});
}
return Q(request);
}
_completeRequest(request) {
// requeue the request if needed then wait for any accumulated promises and release the lock and clean up the queue
@ -117,16 +136,6 @@ class Crawler {
}
}
_pop(queue, request = null) {
const self = this;
return (request ? Q(request) : queue.pop()).then(result => {
if (result) {
result.originQueue = queue;
}
return result;
});
}
_startNext(name, request) {
const delay = request.shouldDelay() ? 1000 : 0;
setTimeout(this.start.bind(this, name), delay);
@ -189,7 +198,7 @@ class Crawler {
_convertToDocument(request) {
if (request.shouldSkip()) {
return Q.resolve(request);
return Q(request);
}
// If the doc is an array, wrap it in an object to make storage more consistent (Mongo can't store arrays directly)
@ -203,12 +212,12 @@ class Crawler {
fetchedAt: moment.utc().toISOString(),
links: {}
};
return Q.resolve(request);
return Q(request);
}
_processDocument(request) {
if (request.shouldSkip()) {
return Q.resolve(request);
return Q(request);
}
let handler = this.processor[request.type];
handler = handler || this[request.type];
@ -217,13 +226,13 @@ class Crawler {
}
request.document = handler.call(this.processor, request);
return Q.resolve(request);
return Q(request);
}
_storeDocument(request) {
// See if we should skip storing the document. Test request.store explicitly for false as it may just not be set.
if (request.shouldSkip() || !this.store || !request.document || request.store === false) {
return Q.resolve(request);
return Q(request);
}
return this.store.upsert(request.document).then((upsert) => {
@ -234,7 +243,7 @@ class Crawler {
_deleteFromQueue(request) {
if (!request.message) {
return Q.resolve(request);
return Q(request);
}
return this.normalQueue.done(request).then(() => { return request; });
}

Просмотреть файл

@ -4,7 +4,6 @@ class Request {
constructor(type, url) {
this.type = type;
this.url = url;
this.promises = [];
}
addMeta(data) {

Просмотреть файл

@ -6,6 +6,111 @@ const extend = require('extend');
const Q = require('q');
const Request = require('../lib/request');
describe('Crawler get request', () => {
it('should get from the priority queue first', () => {
const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('priority');
expect(request.originQueue === priority).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
});
});
it('should get from the normal queue if no priority', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('normal');
expect(request.originQueue === normal).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
});
});
it('should return a dummy skip/delay request if none are queued', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('wait');
expect(request.lock).to.be.undefined;
expect(request.shouldSkip()).to.be.true;
expect(request.flowControl).to.be.equal('delay');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
});
});
it('should throw when normal pop errors', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { throw new Error('normal test'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
error => expect(error.message).to.be.equal('normal test')
);
});
it('should throw when priority pop errors', () => {
const priority = createBaseQueue({ pop: () => { throw new Error('priority test'); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
error => expect(error.message).to.be.equal('priority test')
);
});
it('should throw when acquire lock errors', () => {
const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
error => expect(error.message).to.be.equal('locker error')
);
});
it('should abandon the request when the lock cannot be acquired', () => {
const abandoned = [];
const priority = createBaseQueue({
pop: () => {
return Q(new Request('priority', 'http://test'));
},
abandon: request => {
abandoned.push(request);
return Q();
}
});
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
error => {
expect(error.message).to.be.equal('locker error');
expect(abandoned.length).to.be.equal(1);
});
});
});
describe('Crawler fetch', () => {
it('should skip skipped requests', () => {
const request = new Request('foo', null);
@ -114,12 +219,9 @@ describe('Crawler fetch', () => {
return Q().then(() => {
return crawler._fetch(request);
}).then(
request => {
assert.fail();
},
error => {
expect(error.message.startsWith('Code: 500')).to.be.true;
});
request => assert.fail(),
error => expect(error.message.startsWith('Code: 500')).to.be.true
);
});
it('should throw for store etag errors', () => {
@ -129,12 +231,9 @@ describe('Crawler fetch', () => {
return Q().then(() => {
return crawler._fetch(request);
}).then(
request => {
assert.fail();
},
error => {
expect(error.message).to.be.equal('test');
});
request => assert.fail(),
error => expect(error.message).to.be.equal('test')
);
});
it('should throw for requestor get errors', () => {
@ -147,12 +246,9 @@ describe('Crawler fetch', () => {
return Q().then(() => {
return crawler._fetch(request);
}).then(
request => {
assert.fail();
},
error => {
expect(error.message).to.be.equal('test');
});
request => assert.fail(),
error => expect(error.message).to.be.equal('test')
);
});
it('should throw for store get errors', () => {
@ -167,14 +263,10 @@ describe('Crawler fetch', () => {
return Q().then(() => {
return crawler._fetch(request);
}).then(
request => {
assert.fail();
},
error => {
expect(error.message).to.be.equal('test');
request => assert.fail(),
error => expect(error.message).to.be.equal('test')
);
});
});
});
function createResponse(body, code = 200, etag = null) {
@ -223,8 +315,8 @@ function createLinkHeader(target, previous, next, last) {
return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(',');
}
function createBaseCrawler({normalQueue = createBaseQueue(), priorityQueue = createBaseQueue(), deadLetterQueue = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = null, winston = createBaseLog() } = {}) {
return new Crawler(normalQueue, priorityQueue, deadLetterQueue, store, locker, requestor, options, winston);
function createBaseCrawler({normal = createBaseQueue(), priority = createBaseQueue(), deadLetter = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = null, winston = createBaseLog() } = {}) {
return new Crawler(normal, priority, deadLetter, store, locker, requestor, options, winston);
}
function createBaseQueue({ pop = null, push = null, done = null, abandon = null} = {}) {