зеркало из https://github.com/microsoft/ghcrawler.git
Merge pull request #11 from Microsoft/jm/productionTweaks
Add crawler service
This commit is contained in:
Коммит
42695a0ae6
1
index.js
1
index.js
|
@ -1,4 +1,5 @@
|
|||
module.exports.crawler = require('./lib/crawler');
|
||||
module.exports.crawlerService = require('./lib/crawlerService');
|
||||
module.exports.eventFinder = require('./lib/eventFinder');
|
||||
module.exports.processor = require('./lib/processor');
|
||||
module.exports.queueSet = require('./lib/queueSet');
|
||||
|
|
|
@ -16,11 +16,57 @@ class Crawler {
|
|||
this.processor = new Processor();
|
||||
}
|
||||
|
||||
start(name) {
|
||||
run(options) {
|
||||
let delay = options.delay;
|
||||
if (delay === -1) {
|
||||
// We are done call the done handler and return without continuing the loop
|
||||
return options.done ? options.done() : null;
|
||||
}
|
||||
setTimeout(() => { this._run(options); }, delay);
|
||||
}
|
||||
|
||||
_run(options) {
|
||||
try {
|
||||
// if this loop got cancelled while sleeping, exit
|
||||
if (options.delay === -1) {
|
||||
return options.done ? options.done() : null;
|
||||
}
|
||||
options.delay = 0;
|
||||
return Q.try(() => this.processOne(options))
|
||||
.then(this.log(this._computeDelay.bind(this, options)))
|
||||
.then(this.log(this.run.bind(this, options), this.log(this.run.bind(this, options))));
|
||||
} catch (error) {
|
||||
// If for some reason we throw all the way out of start, log and restart the loop
|
||||
this.logger.error(new Error('PANIC! Crawl loop exited unexpectedly'));
|
||||
this.logger.error(error);
|
||||
this.run(options);
|
||||
}
|
||||
}
|
||||
_computeDelay(options, request) {
|
||||
let delay = options.delay;
|
||||
if (delay === -1) {
|
||||
return request;
|
||||
}
|
||||
delay = delay || 0;
|
||||
const now = Date.now();
|
||||
const requestGate = now + (request.shouldDelay() ? 2000 : 0);
|
||||
const delayGate = request.nextRequestTime || now;
|
||||
const nextRequestTime = Math.max(requestGate, delayGate, now);
|
||||
delay = Math.max(0, nextRequestTime - now);
|
||||
options.delay = delay;
|
||||
if (delay) {
|
||||
this.logger.verbose(`Crawler: ${options.name} waiting for ${delay}ms`);
|
||||
}
|
||||
return delay;
|
||||
}
|
||||
|
||||
|
||||
|
||||
processOne(options) {
|
||||
let requestBox = [];
|
||||
|
||||
return Q()
|
||||
.then(this.log(this._getRequest.bind(this, requestBox, name)))
|
||||
.then(this.log(this._getRequest.bind(this, requestBox, options)))
|
||||
.then(this.log(this._filter.bind(this)))
|
||||
.then(this.log(this._fetch.bind(this)))
|
||||
.then(this.log(this._convertToDocument.bind(this)))
|
||||
|
@ -29,8 +75,7 @@ class Crawler {
|
|||
.catch(this.log(this._errorHandler.bind(this, requestBox)))
|
||||
.then(this.log(this._completeRequest.bind(this), this._completeRequest.bind(this)))
|
||||
.catch(this.log(this._errorHandler.bind(this, requestBox)))
|
||||
.then(this.log(this._logOutcome.bind(this)))
|
||||
.then(this.log(this._startNext.bind(this, name), this._startNext.bind(this, name)));
|
||||
.then(this.log(this._logOutcome.bind(this)));
|
||||
}
|
||||
|
||||
_errorHandler(requestBox, error) {
|
||||
|
@ -44,7 +89,7 @@ class Crawler {
|
|||
return request;
|
||||
}
|
||||
|
||||
_getRequest(requestBox, name) {
|
||||
_getRequest(requestBox, options) {
|
||||
const self = this;
|
||||
return this.log(this.queues.pop())
|
||||
.then(request => {
|
||||
|
@ -55,7 +100,7 @@ class Crawler {
|
|||
}
|
||||
request.start = Date.now();
|
||||
request.crawler = self;
|
||||
request.crawlerName = name;
|
||||
request.crawlerName = options.name;
|
||||
requestBox[0] = request;
|
||||
request.promises = [];
|
||||
return request;
|
||||
|
@ -135,30 +180,6 @@ class Crawler {
|
|||
}
|
||||
}
|
||||
|
||||
_startNext(name, request) {
|
||||
const now = Date.now();
|
||||
let delay = 0;
|
||||
if (request) {
|
||||
const requestGate = now + (request.shouldDelay() ? 1000 : 0);
|
||||
const delayGate = request.nextRequestTime || now;
|
||||
const nextRequestTime = Math.max(requestGate, delayGate, now);
|
||||
delay = Math.max(0, nextRequestTime - now);
|
||||
}
|
||||
if (delay) {
|
||||
this.logger.verbose(`Crawler: ${name} waiting for ${delay}ms`);
|
||||
}
|
||||
setTimeout(() => {
|
||||
try {
|
||||
this.start(name);
|
||||
} catch (error) {
|
||||
// If for some reason we throw all the way out of start, log and restart the loop
|
||||
this.logger.error(new Error('PANIC! Crawl loop exited unexpectedly'));
|
||||
this.logger.error(error);
|
||||
this._startNext(name, null);
|
||||
}
|
||||
}, delay);
|
||||
}
|
||||
|
||||
_filter(request) {
|
||||
if (!this._shouldInclude(request.type, request.url)) {
|
||||
request.markSkip('Filtered');
|
||||
|
@ -265,8 +286,10 @@ class Crawler {
|
|||
return Q(request);
|
||||
}
|
||||
|
||||
const start = Date.now();
|
||||
return this.store.upsert(request.document).then(upsert => {
|
||||
request.upsert = upsert;
|
||||
request.addMeta({ store: Date.now() - start });
|
||||
return request;
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
const Q = require('q');
|
||||
|
||||
class CrawlerService {
|
||||
constructor(crawler, options = null) {
|
||||
this.crawler = crawler;
|
||||
this.loops = [];
|
||||
this.options = options || { count: 0 };
|
||||
}
|
||||
|
||||
ensureInitialized() {
|
||||
return this.crawler.initialize ? this.crawler.initialize() : Q();
|
||||
}
|
||||
|
||||
reconfigure(options) {
|
||||
this.options = Object.assign({}, options);
|
||||
return options.count > 0 ? this.run() : this.stop();
|
||||
}
|
||||
|
||||
run() {
|
||||
return this.ensureInitialized().then(() => {
|
||||
return this.ensureLoops();
|
||||
});
|
||||
}
|
||||
|
||||
_loopComplete(loop) {
|
||||
console.log(`Done loop for ${loop.options.name}`);
|
||||
// this.ensureLoops();
|
||||
}
|
||||
|
||||
ensureLoops() {
|
||||
this.loops = this.loops.filter(loop => loop.running());
|
||||
const running = this.status();
|
||||
const delta = this.options.count - running;
|
||||
if (delta < 0) {
|
||||
for (let i = 0; i < Math.abs(delta); i++) {
|
||||
const loop = this.loops.shift();
|
||||
loop.stop();
|
||||
}
|
||||
} else {
|
||||
for (let i = 0; i < delta; i++) {
|
||||
const loop = new CrawlerLoop(this.crawler, i.toString());
|
||||
loop.run().finally(this._loopComplete.bind(this, loop));
|
||||
this.loops.push(loop);
|
||||
}
|
||||
}
|
||||
return Q();
|
||||
}
|
||||
|
||||
status() {
|
||||
return this.loops.reduce((running, loop) => {
|
||||
return running + (loop.running ? 1 : 0);
|
||||
}, 0);
|
||||
}
|
||||
|
||||
stop() {
|
||||
return this.ensureLoops();
|
||||
}
|
||||
|
||||
getOptions() {
|
||||
return Object.assign({}, this.options);
|
||||
}
|
||||
|
||||
queues() {
|
||||
return this.crawler.queues;
|
||||
}
|
||||
}
|
||||
|
||||
class CrawlerLoop {
|
||||
constructor(crawler, name) {
|
||||
this.crawler = crawler;
|
||||
this.options = { name: name, delay: 0 };
|
||||
this.done = null;
|
||||
this.state = null;
|
||||
}
|
||||
|
||||
running() {
|
||||
return this.state === 'running';
|
||||
}
|
||||
|
||||
run() {
|
||||
if (this.state) {
|
||||
throw new Error(`Loop ${this.options.name} can only be run once`);
|
||||
}
|
||||
this.state = 'running';
|
||||
// Create callback that when run, resolves a promise and completes this loop
|
||||
const doneDeferred = Q.defer();
|
||||
this.done = value => doneDeferred.resolve(value);
|
||||
this.options.done = this.done;
|
||||
const donePromise = doneDeferred.promise;
|
||||
donePromise.finally(() => {
|
||||
this.state = 'stopped';
|
||||
});
|
||||
|
||||
// Kick off the loop and don't worry about the return value.
|
||||
// donePromise will be resolved when the loop is complete.
|
||||
this.crawler.run(this.options);
|
||||
return donePromise;
|
||||
}
|
||||
|
||||
stop() {
|
||||
if (this.state === 'stopped' || this.state === 'stopping') {
|
||||
return;
|
||||
}
|
||||
this.state = 'stopping';
|
||||
// set delay to tell the loop to stop next time around
|
||||
// TODO consider explicitly waking sleeping loops but they will check whether they
|
||||
// should keep running when they wake up.
|
||||
this.options.delay = -1;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = CrawlerService;
|
|
@ -16,7 +16,7 @@ describe('Crawler get request', () => {
|
|||
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
|
||||
const crawler = createBaseCrawler({ queues: queues, locker: locker });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(request => {
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
|
||||
expect(request.type).to.be.equal('priority');
|
||||
expect(request._originQueue === queues.priority).to.be.true;
|
||||
expect(request.lock).to.be.equal('locked');
|
||||
|
@ -32,7 +32,7 @@ describe('Crawler get request', () => {
|
|||
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
|
||||
const crawler = createBaseCrawler({ queues: queues, locker: locker });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(request => {
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
|
||||
expect(request.type).to.be.equal('normal');
|
||||
expect(request._originQueue === queues.normal).to.be.true;
|
||||
expect(request.lock).to.be.equal('locked');
|
||||
|
@ -47,7 +47,7 @@ describe('Crawler get request', () => {
|
|||
const queues = createBaseQueues({ priority: priority, normal: normal });
|
||||
const crawler = createBaseCrawler({ queues: queues });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(request => {
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
|
||||
expect(request.type).to.be.equal('_blank');
|
||||
expect(request.lock).to.be.undefined;
|
||||
expect(request.shouldSkip()).to.be.true;
|
||||
|
@ -63,7 +63,7 @@ describe('Crawler get request', () => {
|
|||
const queues = createBaseQueues({ priority: priority, normal: normal });
|
||||
const crawler = createBaseCrawler({ queues: queues });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(
|
||||
request => assert.fail(),
|
||||
error => expect(error.message).to.be.equal('normal test')
|
||||
);
|
||||
|
@ -75,7 +75,7 @@ describe('Crawler get request', () => {
|
|||
const queues = createBaseQueues({ priority: priority, normal: normal });
|
||||
const crawler = createBaseCrawler({ queues: queues });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(
|
||||
request => assert.fail(),
|
||||
error => expect(error.message).to.be.equal('priority test')
|
||||
);
|
||||
|
@ -88,7 +88,7 @@ describe('Crawler get request', () => {
|
|||
const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } });
|
||||
const crawler = createBaseCrawler({ queues: queues, locker: locker });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(
|
||||
request => assert.fail(),
|
||||
error => expect(error.message).to.be.equal('locker error')
|
||||
);
|
||||
|
@ -108,7 +108,7 @@ describe('Crawler get request', () => {
|
|||
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
|
||||
const crawler = createBaseCrawler({ queues: queues, locker: locker });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(
|
||||
request => assert.fail(),
|
||||
error => {
|
||||
expect(error.message).to.be.equal('locker error');
|
||||
|
@ -127,7 +127,7 @@ describe('Crawler get request', () => {
|
|||
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
|
||||
const crawler = createBaseCrawler({ queues: queues, locker: locker });
|
||||
const requestBox = [];
|
||||
return crawler._getRequest(requestBox, 'test').then(
|
||||
return crawler._getRequest(requestBox, { name: 'test' }).then(
|
||||
request => assert.fail(),
|
||||
error => {
|
||||
expect(error.message).to.be.equal('locker error');
|
||||
|
@ -883,51 +883,53 @@ describe('Crawler store document', () => {
|
|||
describe('Crawler whole meal deal', () => {
|
||||
it('should delay starting next iteration when markDelay', () => {
|
||||
const crawler = createBaseCrawler();
|
||||
sinon.stub(crawler, 'start', () => Q());
|
||||
const clock = sinon.useFakeTimers();
|
||||
sinon.spy(clock, 'setTimeout');
|
||||
crawler.run = () => { };
|
||||
crawler.processOne = () => { return Q(request) };
|
||||
|
||||
const request = new Request('user', 'http://test.com/users/user1');
|
||||
request.markDelay();
|
||||
|
||||
crawler._startNext('test', request);
|
||||
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(1000);
|
||||
const options = { name: 'foo', delay: 0 };
|
||||
return crawler._run(options).then(() => {
|
||||
expect(options.delay).to.be.equal(2000);
|
||||
});
|
||||
});
|
||||
|
||||
it('should delay starting next iteration when delayUntil', () => {
|
||||
const crawler = createBaseCrawler();
|
||||
sinon.stub(crawler, 'start', () => Q());
|
||||
const clock = sinon.useFakeTimers();
|
||||
sinon.spy(clock, 'setTimeout');
|
||||
crawler.run = () => { };
|
||||
crawler.processOne = () => { return Q(request) };
|
||||
|
||||
const request = new Request('user', 'http://test.com/users/user1');
|
||||
request.delayUntil(323);
|
||||
request.delayUntil(Date.now() + 323);
|
||||
|
||||
crawler._startNext('test', request);
|
||||
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(323);
|
||||
const options = { name: 'foo', delay: 0 };
|
||||
return crawler._run(options).then(() => {
|
||||
expect(options.delay).to.be.approximately(323, 4);
|
||||
});
|
||||
});
|
||||
|
||||
it('should delay starting next iteration when delayFor', () => {
|
||||
const crawler = createBaseCrawler();
|
||||
sinon.stub(crawler, 'start', () => Q());
|
||||
const clock = sinon.useFakeTimers();
|
||||
sinon.spy(clock, 'setTimeout');
|
||||
crawler.run = () => { };
|
||||
crawler.processOne = () => { return Q(request) };
|
||||
|
||||
const request = new Request('user', 'http://test.com/users/user1');
|
||||
request.delayFor(451);
|
||||
|
||||
crawler._startNext('test', request);
|
||||
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(451);
|
||||
const options = { name: 'foo', delay: 0 };
|
||||
return crawler._run(options).then(() => {
|
||||
expect(options.delay).to.be.approximately(451, 4);
|
||||
});
|
||||
});
|
||||
|
||||
it('should process normal requests', () => {
|
||||
const crawler = createFullCrawler();
|
||||
sinon.stub(crawler, '_startNext', () => Q());
|
||||
|
||||
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
|
||||
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
|
||||
return Q.try(() => {
|
||||
return crawler.start('test');
|
||||
return crawler.processOne({ name: 'test' });
|
||||
}).then(() => {
|
||||
expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count');
|
||||
expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count');
|
||||
|
@ -971,7 +973,6 @@ describe('Crawler whole meal deal', () => {
|
|||
|
||||
it('should handle getRequest reject', () => {
|
||||
const crawler = createFullCrawler();
|
||||
sinon.stub(crawler, '_startNext', () => Q());
|
||||
|
||||
// setup a problem popping
|
||||
const normal = createBaseQueue();
|
||||
|
@ -982,7 +983,7 @@ describe('Crawler whole meal deal', () => {
|
|||
|
||||
crawler.requestor.responses = [createResponse(null, 500)];
|
||||
return Q.try(() => {
|
||||
return crawler.start('test');
|
||||
return crawler.processOne({ name: 'test' });
|
||||
}).then(() => {
|
||||
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
|
||||
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
|
||||
|
@ -1015,13 +1016,12 @@ describe('Crawler whole meal deal', () => {
|
|||
|
||||
it('should handle fetch reject', () => {
|
||||
const crawler = createFullCrawler();
|
||||
sinon.stub(crawler, '_startNext', () => Q());
|
||||
|
||||
// setup a good request but a server error response
|
||||
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
|
||||
crawler.requestor.responses = [createResponse(null, 500)];
|
||||
return Q.try(() => {
|
||||
return crawler.start('test');
|
||||
return crawler.processOne({ name: 'test' });
|
||||
}).then(() => {
|
||||
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
|
||||
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
|
||||
|
@ -1062,13 +1062,12 @@ describe('Crawler whole meal deal', () => {
|
|||
|
||||
it('should handle process document reject', () => {
|
||||
const crawler = createFullCrawler();
|
||||
sinon.stub(crawler, '_startNext', () => Q());
|
||||
crawler.processor = { process: () => { throw new Error('bad processor') } };
|
||||
|
||||
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
|
||||
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
|
||||
return Q.try(() => {
|
||||
return crawler.start('test');
|
||||
return crawler.processOne({ name: 'test' });
|
||||
}).then(() => {
|
||||
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
|
||||
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
|
||||
|
@ -1109,13 +1108,12 @@ describe('Crawler whole meal deal', () => {
|
|||
|
||||
it('should handle store document reject', () => {
|
||||
const crawler = createFullCrawler();
|
||||
sinon.stub(crawler, '_startNext', () => Q());
|
||||
crawler.store = { upsert: () => { throw new Error('bad upsert') } };
|
||||
|
||||
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
|
||||
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
|
||||
return Q.try(() => {
|
||||
return crawler.start('test');
|
||||
return crawler.processOne({ name: 'test' });
|
||||
}).then(() => {
|
||||
const unlock = crawler.locker.unlock;
|
||||
expect(unlock.callCount).to.be.equal(1);
|
||||
|
@ -1137,13 +1135,12 @@ describe('Crawler whole meal deal', () => {
|
|||
|
||||
it('should handle complete request reject', () => {
|
||||
const crawler = createFullCrawler();
|
||||
sinon.stub(crawler, '_startNext', () => Q());
|
||||
crawler.locker = { unlock: () => { throw new Error('bad unlock') } };
|
||||
|
||||
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
|
||||
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
|
||||
return Q.try(() => {
|
||||
return crawler.start('test');
|
||||
return crawler.processOne({ name: 'test' });
|
||||
}).then(() => {
|
||||
const push = crawler.queues.normal.push;
|
||||
expect(push.callCount).to.be.equal(1);
|
||||
|
|
Загрузка…
Ссылка в новой задаче