move process() to Processor and more tests!

This commit is contained in:
Jeff McAffer 2016-11-13 23:38:43 -08:00
Родитель 05443509c6
Коммит a9817362ae
4 изменённых файлов: 248 добавлений и 43 удалений

Просмотреть файл

@ -39,8 +39,9 @@ class Crawler {
if (requestBox[0]) {
return requestBox[0].markRequeue('Error', error);
}
const request = new Request('wait', null);
const request = new Request('_errorTrap', null);
request.markDelay();
request.markSkip('Error', error);
requestBox[0] = request;
return request;
}
@ -52,7 +53,7 @@ class Crawler {
.then(this._pop.bind(this, this.normalQueue))
.then(request => {
if (!request) {
request = new Request('wait', null);
request = new Request('_blank', null);
request.markDelay();
request.markSkip('Exhausted queue', `Waiting 1 second`);
}
@ -125,7 +126,7 @@ class Crawler {
return Q.all(request.promises)
.finally(() => self._releaseLock(request))
.finally(() => self._deleteFromQueue(request))
.then(() => request, () => request);
.then(() => request);
}
_requeue(request) {
@ -226,14 +227,7 @@ class Crawler {
if (request.shouldSkip()) {
return Q(request);
}
let handler = this.processor[request.type];
handler = handler || this[request.type];
if (!handler) {
request.markSkip('Warning', `No handler found for request type: ${request.type}`);
return Q(request);
}
request.document = handler.call(this.processor, request);
request.document = this.processor.process(request);
return Q(request);
}

Просмотреть файл

@ -5,6 +5,17 @@ const URL = require('url');
class Processor {
constructor() {
}
process(request) {
let handler = this[request.type];
handler = handler || this[request.type];
if (!handler) {
request.markSkip('Warning', `No handler found for request type: ${request.type}`);
return request.document;
}
return handler.call(this, request);
}
collection(request) {

Просмотреть файл

@ -76,6 +76,9 @@ class Request {
}
markSkip(outcome, message) {
if (this.shouldSkip()) {
return this;
}
this.processControl = 'skip';
this.outcome = this.outcome || outcome;
this.message = this.message || message;
@ -83,6 +86,9 @@ class Request {
}
markRequeue(outcome, message) {
if (this.shouldRequeue()) {
return this;
}
this.processControl = 'requeue';
this.outcome = this.outcome || outcome;
this.message = this.message || message;
@ -94,6 +100,9 @@ class Request {
}
markDelay() {
if (this.shouldDelay()) {
return this;
}
this.flowControl = 'delay';
return this;
}

Просмотреть файл

@ -44,7 +44,7 @@ describe('Crawler get request', () => {
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('wait');
expect(request.type).to.be.equal('_blank');
expect(request.lock).to.be.undefined;
expect(request.shouldSkip()).to.be.true;
expect(request.flowControl).to.be.equal('delay');
@ -323,8 +323,7 @@ describe('Crawler error handler', () => {
const crawler = createBaseCrawler();
const error = 'error';
const request = crawler._errorHandler(box, error);
expect(request.shouldSkip()).to.be.false;
expect(request.shouldRequeue()).to.be.false;
expect(request.message).to.be.equal(error);
expect(request.flowControl).to.be.equal('delay');
});
});
@ -608,15 +607,13 @@ describe('Crawler complete request', () => {
originalRequest.promises = [Q.reject(13)];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
return crawler._completeRequest(originalRequest).then(
request => {
expect(request === originalRequest).to.be.true;
expect(request.lock).to.be.null;
request => assert.fail(),
error => {
expect(done.length).to.be.equal(1);
expect(done[0] === request).to.be.true;
expect(done[0] === originalRequest).to.be.true;
expect(unlock.length).to.be.equal(1);
expect(unlock[0]).to.be.equal(42);
},
error => assert.fail());
});
});
it('still dequeues when unlocking fails', () => {
@ -651,14 +648,12 @@ describe('Crawler complete request', () => {
originalRequest.promises = [];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
return crawler._completeRequest(originalRequest).then(
request => {
expect(request === originalRequest).to.be.true;
expect(request.lock).to.be.null;
request => assert.fail(),
error => {
expect(done.length).to.be.equal(0);
expect(unlock.length).to.be.equal(1);
expect(unlock[0]).to.be.equal(42);
},
error => assert.fail());
});
});
});
@ -728,16 +723,16 @@ describe('Crawler process document', () => {
const doc = {};
originalRequest.document = doc;
const crawler = createBaseCrawler();
const requestBox = [];
crawler.test = request => {
requestBox[0] = 42;
const processorBox = [];
crawler.processor.test = request => {
processorBox[0] = 42;
request.document.cool = 'content';
return request.document;
};
return crawler._processDocument(originalRequest).then(request => {
expect(request === originalRequest).to.be.true;
expect(requestBox.length).to.be.equal(1);
expect(requestBox[0]).to.be.equal(42);
expect(processorBox.length).to.be.equal(1);
expect(processorBox[0]).to.be.equal(42);
expect(request.document === doc).to.be.true;
expect(request.document.cool).to.be.equal('content');
});
@ -757,7 +752,7 @@ describe('Crawler process document', () => {
const originalRequest = new Request('test', 'http://test.com');
originalRequest.document = {};
const crawler = createBaseCrawler();
crawler.test = request => { throw new Error('bummer'); };
crawler.processor.test = request => { throw new Error('bummer'); };
return Q.try(() => {
crawler._processDocument(originalRequest)
}).then(
@ -805,12 +800,12 @@ describe('Crawler store document', () => {
});
describe('Crawler whole meal deal', () => {
it('should process normal requests (sinon)', () => {
it('should process normal requests', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: '42', repos_url: 'http://test.com/users/user1/repos' })];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
@ -821,23 +816,23 @@ describe('Crawler whole meal deal', () => {
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const push = crawler.normalQueue.push;
expect(push.callCount).to.be.equal(1);
expect(push.getCall(0).args[0].type).to.be.equal('repos');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const process = crawler.processor.process;
expect(process.callCount).to.be.equal(1);
expect(process.getCall(0).args[0].type).to.be.equal('user');
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(1);
const document = upsert.getCall(0).args[0];
expect(document.id).to.be.equal('42');
expect(document.id).to.be.equal(42);
expect(document._metadata.url).to.be.equal('http://test.com/users/user1');
const unlock = crawler.locker.unlock;
@ -846,9 +841,199 @@ describe('Crawler whole meal deal', () => {
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.logger.info.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
});
});
it('should empty request queues', () => {
});
it('should handle getRequest reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
// setup a problem popping
const normal = createBaseQueue();
sinon.stub(normal, 'pop', () => { throw Error('cant pop') });
sinon.stub(normal, 'push', request => { return Q(); });
sinon.spy(normal, 'done');
crawler.normalQueue = normal;
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1);
expect(crawler.normalQueue.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(0);
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(0);
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(0);
const push = crawler.normalQueue.push;
expect(push.callCount).to.be.equal(0);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(0);
expect(crawler.normalQueue.done.callCount).to.be.equal(0);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error.message).to.be.equal('cant pop');
});
});
it('should handle fetch reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
// setup a good request but a server error response
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1);
expect(crawler.normalQueue.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.normalQueue.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error.message.includes('500')).to.be.true;
});
});
it('should handle process document reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.processor = { process: () => { throw new Error('bad processor') } };
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1);
expect(crawler.normalQueue.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.normalQueue.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
});
});
it('should handle store document reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.store = { upsert: () => { throw new Error('bad upsert') } };
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
const push = crawler.normalQueue.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
});
});
it('should handle complete request reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.locker = { unlock: () => { throw new Error('bad unlock') } };
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
const push = crawler.normalQueue.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(2);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
});
});
});
@ -881,9 +1066,15 @@ function createFullCrawler() {
sinon.spy(logger, 'info');
sinon.spy(logger, 'error');
const Processor = require('../lib/processor');
const processor = new Processor();
sinon.spy(processor, 'process');
const config = [];
return createBaseCrawler({ normal: normal, priority: priority, requestor: requestor, store: store, logger: logger, locker: locker, options: config });
const result = createBaseCrawler({ normal: normal, priority: priority, requestor: requestor, store: store, logger: logger, locker: locker, options: config });
result.processor = processor;
return result;
}
function createResponse(body, code = 200, etag = null) {