This commit is contained in:
Jeff McAffer 2016-11-20 22:45:18 -08:00
Родитель 40713f46b7
Коммит 5de53a5501
3 изменённых файлов: 217 добавлений и 156 удалений

Просмотреть файл

@ -22,6 +22,9 @@ class Crawler {
// We are done call the done handler and return without continuing the loop
return options.done ? options.done() : null;
}
if (delay) {
this.logger.verbose(`Crawler: ${options.name} waiting for ${delay}ms`);
}
setTimeout(() => { this._run(options); }, delay);
}
@ -33,35 +36,39 @@ class Crawler {
}
options.delay = 0;
return Q.try(() => this.processOne(options))
.then(this.log(this._computeDelay.bind(this, options)))
.then(this.log(this.run.bind(this, options), this.log(this.run.bind(this, options))));
.then(this.log(this._computeDelay.bind(this, options)), this._panic.bind(this, options))
.finally(this.log(this.run.bind(this, options)));
} catch (error) {
// If for some reason we throw all the way out of start, log and restart the loop
this.logger.error(new Error('PANIC! Crawl loop exited unexpectedly'));
this.logger.error(error);
this._panic(options, error);
this.run(options);
}
}
_computeDelay(options, request) {
_panic(options, error) {
this.logger.error(new Error('PANIC, we should not have gotten here'));
this.logger.error(error);
}
_computeDelay(options, delaySpec) {
let delay = options.delay;
if (delay === -1) {
return request;
return delay;
}
delay = delay || 0;
const now = Date.now();
const requestGate = now + (request.shouldDelay() ? 2000 : 0);
const delayGate = request.nextRequestTime || now;
const requestGate = now + (delaySpec.shouldDelay() ? 2000 : 0);
const delayGate = delaySpec.nextRequestTime || now;
const nextRequestTime = Math.max(requestGate, delayGate, now);
delay = Math.max(0, nextRequestTime - now);
options.delay = delay;
if (delay) {
this.logger.verbose(`Crawler: ${options.name} waiting for ${delay}ms`);
}
return delay;
}
/**
* Process one request cycle. If an error happens during processing, handle it there and
* return a spec describing any delays that should be in .
*/
processOne(options) {
let requestBox = [];
@ -75,12 +82,18 @@ class Crawler {
.catch(this.log(this._errorHandler.bind(this, requestBox)))
.then(this.log(this._completeRequest.bind(this), this._completeRequest.bind(this)))
.catch(this.log(this._errorHandler.bind(this, requestBox)))
.then(this.log(this._logOutcome.bind(this)));
.then(this.log(this._logOutcome.bind(this)))
.catch(this.log(this._errorHandler.bind(this, requestBox)));
}
_errorHandler(requestBox, error) {
if (requestBox[0]) {
return requestBox[0].markRequeue('Error', error);
if (requestBox[0].type === '_errorTrap') {
// TODO if there is a subsequent error, just capture the first and carry on for now. likely should log
return requestBox[0];
} else {
return requestBox[0].markRequeue('Error', error);
}
}
const request = new Request('_errorTrap', null);
request.markDelay();

Просмотреть файл

@ -1,9 +1,10 @@
const extend = require('extend');
class Request {
constructor(type, url) {
constructor(type, url, context = null) {
this.type = type;
this.url = url;
this.context = context || {};
this.promises = [];
}

Просмотреть файл

@ -880,6 +880,48 @@ describe('Crawler store document', () => {
});
});
describe('Crawler run', () => {
it('should panic for rejects in processOne', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'run', () => { });
sinon.stub(crawler, 'processOne', () => { return Q.reject('test error') });
sinon.spy(crawler, '_computeDelay');
sinon.spy(crawler, '_panic');
const request = new Request('user', 'http://test.com/users/user1');
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(crawler.processOne.callCount).to.be.equal(1);
expect(crawler._computeDelay.callCount).to.be.equal(0);
expect(crawler._panic.callCount).to.be.equal(1);
expect(crawler.run.callCount).to.be.equal(1);
}, error => {
console.log(error);
});
});
it('should panic for errors in processOne', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'run', () => { });
sinon.stub(crawler, 'processOne', () => { throw new Error('test error') });
sinon.spy(crawler, '_computeDelay');
sinon.spy(crawler, '_panic');
const request = new Request('user', 'http://test.com/users/user1');
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(crawler.processOne.callCount).to.be.equal(1);
expect(crawler._computeDelay.callCount).to.be.equal(0);
expect(crawler._panic.callCount).to.be.equal(1);
expect(crawler.run.callCount).to.be.equal(1);
}, error => {
console.log(error);
});
});
});
describe('Crawler whole meal deal', () => {
it('should delay starting next iteration when markDelay', () => {
const crawler = createBaseCrawler();
@ -928,43 +970,43 @@ describe('Crawler whole meal deal', () => {
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count');
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count');
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1, 'lock call count');
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1, 'lock call count');
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const process = crawler.processor.process;
expect(process.callCount).to.be.equal(1);
expect(process.getCall(0).args[0].type).to.be.equal('user');
const process = crawler.processor.process;
expect(process.callCount).to.be.equal(1);
expect(process.getCall(0).args[0].type).to.be.equal('user');
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(1);
const document = upsert.getCall(0).args[0];
expect(document.id).to.be.equal(42);
expect(document._metadata.url).to.be.equal('http://test.com/users/user1');
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(1);
const document = upsert.getCall(0).args[0];
expect(document.id).to.be.equal(42);
expect(document._metadata.url).to.be.equal('http://test.com/users/user1');
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
});
expect(crawler.logger.info.callCount).to.be.equal(1);
},
error => assert.fail());
});
it('should empty request queues', () => {
@ -982,36 +1024,36 @@ describe('Crawler whole meal deal', () => {
crawler.queues.normal = normal;
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(0);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(0);
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(0);
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(0);
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(0);
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(0);
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(0);
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(0);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(0);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(0);
expect(crawler.queues.normal.done.callCount).to.be.equal(0);
expect(crawler.queues.normal.done.callCount).to.be.equal(0);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error.message).to.be.equal('cant pop');
});
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error.message).to.be.equal('cant pop');
},
error => assert.fail());
});
it('should handle fetch reject', () => {
@ -1020,44 +1062,44 @@ describe('Crawler whole meal deal', () => {
// setup a good request but a server error response
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error.message.includes('500')).to.be.true;
});
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error.message.includes('500')).to.be.true;
},
error => assert.fail());
});
it('should handle process document reject', () => {
@ -1066,44 +1108,46 @@ describe('Crawler whole meal deal', () => {
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
expect(lock.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const etag = crawler.store.etag;
expect(etag.callCount).to.be.equal(1);
expect(etag.getCall(0).args[0]).to.be.equal('user');
expect(etag.getCall(0).args[1]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
expect(crawler._errorHandler.callCount).to.be.equal(1);
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
const upsert = crawler.store.upsert;
expect(upsert.callCount).to.be.equal(0);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
});
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
},
error => assert.fail());
});
it('should handle store document reject', () => {
@ -1112,25 +1156,25 @@ describe('Crawler whole meal deal', () => {
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.processOne({ name: 'test' });
}).then(() => {
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
});
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
},
error => assert.fail());
});
it('should handle complete request reject', () => {
@ -1139,19 +1183,19 @@ describe('Crawler whole meal deal', () => {
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.processOne({ name: 'test' });
}).then(() => {
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
expect(newRequest.attemptCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
});
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
expect(error instanceof Error).to.be.true;
},
error => assert.fail());
});
});
@ -1164,7 +1208,7 @@ function createFullCrawler() {
normal.requests = [];
sinon.stub(normal, 'pop', () => { return Q(normal.requests.shift()); });
sinon.stub(normal, 'push', request => { return Q(); });
sinon.spy(normal, 'done');
sinon.stub(normal, 'done', request => { return Q(); });
const queues = createBaseQueues({ priority: priority, normal: normal });
@ -1194,6 +1238,9 @@ function createFullCrawler() {
const result = createBaseCrawler({ queues: queues, requestor: requestor, store: store, logger: logger, locker: locker, options: config });
result.processor = processor;
sinon.spy(result, '_errorHandler');
return result;
}