This commit is contained in:
Jeff McAffer 2016-11-20 01:13:13 -08:00
Родитель f1d61b4570
Коммит 9177e47555
4 изменённых файлов: 199 добавлений и 66 удалений

Просмотреть файл

@ -1,4 +1,5 @@
module.exports.crawler = require('./lib/crawler');
module.exports.crawlerService = require('./lib/crawlerService');
module.exports.eventFinder = require('./lib/eventFinder');
module.exports.processor = require('./lib/processor');
module.exports.queueSet = require('./lib/queueSet');

Просмотреть файл

@ -16,11 +16,57 @@ class Crawler {
this.processor = new Processor();
}
start(name) {
run(options) {
let delay = options.delay;
if (delay === -1) {
// We are done call the done handler and return without continuing the loop
return options.done ? options.done() : null;
}
setTimeout(() => { this._run(options); }, delay);
}
_run(options) {
try {
// if this loop got cancelled while sleeping, exit
if (options.delay === -1) {
return options.done ? options.done() : null;
}
options.delay = 0;
return Q.try(() => this.processOne(options))
.then(this.log(this._computeDelay.bind(this, options)))
.then(this.log(this.run.bind(this, options), this.log(this.run.bind(this, options))));
} catch (error) {
// If for some reason we throw all the way out of start, log and restart the loop
this.logger.error(new Error('PANIC! Crawl loop exited unexpectedly'));
this.logger.error(error);
this.run(options);
}
}
_computeDelay(options, request) {
let delay = options.delay;
if (delay === -1) {
return request;
}
delay = delay || 0;
const now = Date.now();
const requestGate = now + (request.shouldDelay() ? 2000 : 0);
const delayGate = request.nextRequestTime || now;
const nextRequestTime = Math.max(requestGate, delayGate, now);
delay = Math.max(0, nextRequestTime - now);
options.delay = delay;
if (delay) {
this.logger.verbose(`Crawler: ${options.name} waiting for ${delay}ms`);
}
return delay;
}
processOne(options) {
let requestBox = [];
return Q()
.then(this.log(this._getRequest.bind(this, requestBox, name)))
.then(this.log(this._getRequest.bind(this, requestBox, options)))
.then(this.log(this._filter.bind(this)))
.then(this.log(this._fetch.bind(this)))
.then(this.log(this._convertToDocument.bind(this)))
@ -29,8 +75,7 @@ class Crawler {
.catch(this.log(this._errorHandler.bind(this, requestBox)))
.then(this.log(this._completeRequest.bind(this), this._completeRequest.bind(this)))
.catch(this.log(this._errorHandler.bind(this, requestBox)))
.then(this.log(this._logOutcome.bind(this)))
.then(this.log(this._startNext.bind(this, name), this._startNext.bind(this, name)));
.then(this.log(this._logOutcome.bind(this)));
}
_errorHandler(requestBox, error) {
@ -44,7 +89,7 @@ class Crawler {
return request;
}
_getRequest(requestBox, name) {
_getRequest(requestBox, options) {
const self = this;
return this.log(this.queues.pop())
.then(request => {
@ -55,7 +100,7 @@ class Crawler {
}
request.start = Date.now();
request.crawler = self;
request.crawlerName = name;
request.crawlerName = options.name;
requestBox[0] = request;
request.promises = [];
return request;
@ -135,30 +180,6 @@ class Crawler {
}
}
_startNext(name, request) {
const now = Date.now();
let delay = 0;
if (request) {
const requestGate = now + (request.shouldDelay() ? 1000 : 0);
const delayGate = request.nextRequestTime || now;
const nextRequestTime = Math.max(requestGate, delayGate, now);
delay = Math.max(0, nextRequestTime - now);
}
if (delay) {
this.logger.verbose(`Crawler: ${name} waiting for ${delay}ms`);
}
setTimeout(() => {
try {
this.start(name);
} catch (error) {
// If for some reason we throw all the way out of start, log and restart the loop
this.logger.error(new Error('PANIC! Crawl loop exited unexpectedly'));
this.logger.error(error);
this._startNext(name, null);
}
}, delay);
}
_filter(request) {
if (!this._shouldInclude(request.type, request.url)) {
request.markSkip('Filtered');
@ -265,8 +286,10 @@ class Crawler {
return Q(request);
}
const start = Date.now();
return this.store.upsert(request.document).then(upsert => {
request.upsert = upsert;
request.addMeta({ store: Date.now() - start });
return request;
});
}

112
lib/crawlerService.js Normal file
Просмотреть файл

@ -0,0 +1,112 @@
const Q = require('q');
class CrawlerService {
constructor(crawler, options = null) {
this.crawler = crawler;
this.loops = [];
this.options = options || { count: 0 };
}
ensureInitialized() {
return this.crawler.initialize ? this.crawler.initialize() : Q();
}
reconfigure(options) {
this.options = Object.assign({}, options);
return options.count > 0 ? this.run() : this.stop();
}
run() {
return this.ensureInitialized().then(() => {
return this.ensureLoops();
});
}
_loopComplete(loop) {
console.log(`Done loop for ${loop.options.name}`);
// this.ensureLoops();
}
ensureLoops() {
this.loops = this.loops.filter(loop => loop.running());
const running = this.status();
const delta = this.options.count - running;
if (delta < 0) {
for (let i = 0; i < Math.abs(delta); i++) {
const loop = this.loops.shift();
loop.stop();
}
} else {
for (let i = 0; i < delta; i++) {
const loop = new CrawlerLoop(this.crawler, i.toString());
loop.run().finally(this._loopComplete.bind(this, loop));
this.loops.push(loop);
}
}
return Q();
}
status() {
return this.loops.reduce((running, loop) => {
return running + (loop.running ? 1 : 0);
}, 0);
}
stop() {
return this.ensureLoops();
}
getOptions() {
return Object.assign({}, this.options);
}
queues() {
return this.crawler.queues;
}
}
class CrawlerLoop {
constructor(crawler, name) {
this.crawler = crawler;
this.options = { name: name, delay: 0 };
this.done = null;
this.state = null;
}
running() {
return this.state === 'running';
}
run() {
if (this.state) {
throw new Error(`Loop ${this.options.name} can only be run once`);
}
this.state = 'running';
// Create callback that when run, resolves a promise and completes this loop
const doneDeferred = Q.defer();
this.done = value => doneDeferred.resolve(value);
this.options.done = this.done;
const donePromise = doneDeferred.promise;
donePromise.finally(() => {
this.state = 'stopped';
});
// Kick off the loop and don't worry about the return value.
// donePromise will be resolved when the loop is complete.
this.crawler.run(this.options);
return donePromise;
}
stop() {
if (this.state === 'stopped' || this.state === 'stopping') {
return;
}
this.state = 'stopping';
// set delay to tell the loop to stop next time around
// TODO consider explicitly waking sleeping loops but they will check whether they
// should keep running when they wake up.
this.options.delay = -1;
}
}
module.exports = CrawlerService;

Просмотреть файл

@ -16,7 +16,7 @@ describe('Crawler get request', () => {
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('priority');
expect(request._originQueue === queues.priority).to.be.true;
expect(request.lock).to.be.equal('locked');
@ -32,7 +32,7 @@ describe('Crawler get request', () => {
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('normal');
expect(request._originQueue === queues.normal).to.be.true;
expect(request.lock).to.be.equal('locked');
@ -47,7 +47,7 @@ describe('Crawler get request', () => {
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('_blank');
expect(request.lock).to.be.undefined;
expect(request.shouldSkip()).to.be.true;
@ -63,7 +63,7 @@ describe('Crawler get request', () => {
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
return crawler._getRequest(requestBox, { name: 'test' }).then(
request => assert.fail(),
error => expect(error.message).to.be.equal('normal test')
);
@ -75,7 +75,7 @@ describe('Crawler get request', () => {
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
return crawler._getRequest(requestBox, { name: 'test' }).then(
request => assert.fail(),
error => expect(error.message).to.be.equal('priority test')
);
@ -88,7 +88,7 @@ describe('Crawler get request', () => {
const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
return crawler._getRequest(requestBox, { name: 'test' }).then(
request => assert.fail(),
error => expect(error.message).to.be.equal('locker error')
);
@ -108,7 +108,7 @@ describe('Crawler get request', () => {
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
return crawler._getRequest(requestBox, { name: 'test' }).then(
request => assert.fail(),
error => {
expect(error.message).to.be.equal('locker error');
@ -127,7 +127,7 @@ describe('Crawler get request', () => {
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
return crawler._getRequest(requestBox, { name: 'test' }).then(
request => assert.fail(),
error => {
expect(error.message).to.be.equal('locker error');
@ -883,51 +883,53 @@ describe('Crawler store document', () => {
describe('Crawler whole meal deal', () => {
it('should delay starting next iteration when markDelay', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'start', () => Q());
const clock = sinon.useFakeTimers();
sinon.spy(clock, 'setTimeout');
crawler.run = () => { };
crawler.processOne = () => { return Q(request) };
const request = new Request('user', 'http://test.com/users/user1');
request.markDelay();
crawler._startNext('test', request);
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(1000);
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(options.delay).to.be.equal(2000);
});
});
it('should delay starting next iteration when delayUntil', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'start', () => Q());
const clock = sinon.useFakeTimers();
sinon.spy(clock, 'setTimeout');
crawler.run = () => { };
crawler.processOne = () => { return Q(request) };
const request = new Request('user', 'http://test.com/users/user1');
request.delayUntil(323);
request.delayUntil(Date.now() + 323);
crawler._startNext('test', request);
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(323);
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(options.delay).to.be.approximately(323, 4);
});
});
it('should delay starting next iteration when delayFor', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'start', () => Q());
const clock = sinon.useFakeTimers();
sinon.spy(clock, 'setTimeout');
crawler.run = () => { };
crawler.processOne = () => { return Q(request) };
const request = new Request('user', 'http://test.com/users/user1');
request.delayFor(451);
crawler._startNext('test', request);
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(451);
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(options.delay).to.be.approximately(451, 4);
});
});
it('should process normal requests', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count');
@ -971,7 +973,6 @@ describe('Crawler whole meal deal', () => {
it('should handle getRequest reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
// setup a problem popping
const normal = createBaseQueue();
@ -982,7 +983,7 @@ describe('Crawler whole meal deal', () => {
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.start('test');
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
@ -1015,13 +1016,12 @@ describe('Crawler whole meal deal', () => {
it('should handle fetch reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
// setup a good request but a server error response
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.start('test');
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
@ -1062,13 +1062,12 @@ describe('Crawler whole meal deal', () => {
it('should handle process document reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.processor = { process: () => { throw new Error('bad processor') } };
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
return crawler.processOne({ name: 'test' });
}).then(() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
@ -1109,13 +1108,12 @@ describe('Crawler whole meal deal', () => {
it('should handle store document reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.store = { upsert: () => { throw new Error('bad upsert') } };
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
return crawler.processOne({ name: 'test' });
}).then(() => {
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(1);
@ -1137,13 +1135,12 @@ describe('Crawler whole meal deal', () => {
it('should handle complete request reject', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.locker = { unlock: () => { throw new Error('bad unlock') } };
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
return crawler.processOne({ name: 'test' });
}).then(() => {
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);