fixup delayUntil and still more tests

This commit is contained in:
Jeff McAffer 2016-11-15 22:21:33 -08:00
Родитель 2e0d059bfe
Коммит 9a1546439e
3 изменённых файлов: 140 добавлений и 55 удалений

Просмотреть файл

@ -18,31 +18,6 @@ class Crawler {
this.processor = new Processor();
}
log(thing) {
const self = this;
if (typeof thing === 'function') {
return function () {
const args = array_slice(arguments);
self.logger.verbose(`Promise Function Enter: ${thing.name}`);
const result = thing.apply(self, args);
if (typeof result.then === 'function') {
result.then(
result => { self.logger.verbose(`Promise Function Success: ${thing.name}`); },
error => { self.logger.verbose(`Promise Function Error: ${thing.name}`, error); });
} else {
self.logger.verbose(`Promise Function Exit: ${thing.name}: ${result}`);
}
return result;
};
} else if (typeof thing.then === 'function') {
this.logger.verbose(`Promise Enter`);
thing.then(
result => { this.logger.verbose(`Promise Success: ${result}`); },
error => { this.logger.verbose(`Promise Error: ${result}`, error); });
return thing;
}
}
start(name) {
let requestBox = [];
@ -171,7 +146,7 @@ class Crawler {
_startNext(name, request) {
const now = Date.now();
const requestGate = now + (request.shouldDelay() ? 1000 : 0);
const delayGate = self.delayUntil || now;
const delayGate = request.nextRequestTime || now;
const nextRequestTime = Math.max(requestGate, delayGate, now);
const delay = Math.max(0, nextRequestTime - now);
setTimeout(this.start.bind(this, name), delay);
@ -212,7 +187,7 @@ class Crawler {
// and wait a couple minutes before processing more requests
if (status === 403) {
const delay = 2 * 60 * 1000;
self._delayFor(delay);
request.delayFor(delay);
request.addMeta({ forbiddenDelay: delay });
return request.markRequeue('GitHub throttled: ${request.url}');
}
@ -333,7 +308,7 @@ class Crawler {
const retryAfter = parseInt(response.headers['Retry-After']) || 0;
if (retryAfter > 0) {
request.addMeta({ retryAfterDelay: retryAfter });
this._deleteFor(retryAfter * 1000);
request.delayFor(retryAfter * 1000);
}
// If we hit the low water mark for requests, proactively sleep until the next ratelimit reset
@ -342,29 +317,53 @@ class Crawler {
const tokenLowerBound = this.config ? (this.config.tokenLowerBound || 50) : 50;
if (remaining < tokenLowerBound) {
const reset = parseInt(response.headers['x-ratelimit-reset']) || 0;
const delay = Math.max(0, Date.now - reset);
const delay = Math.max(0, reset - Date.now());
if (delay > 0) {
request.addMeta({ backoffDelay: delay });
this._delayUntil(reset);
request.delayUntil(reset);
}
}
}
_delayUntil(time) {
if (!this.delayUntil || this.delayUntil < time) {
this.delayUntil = time;
// don't mess with the funky method signature formatting. You need spaces around the
// istanbul comment for istanbul to pick it up but auto code formatting removes the spaces
// before the (. Putting a newline seems to keep everyone happy.
log /* istanbul ignore next */
(thing) {
if (!this.config.promiseTrace) {
return thing;
}
const self = this;
if (typeof thing === 'function') {
return function () {
const args = array_slice(arguments);
const name = thing.name.replace('bound ', '');
self.logger.verbose(`Promise Function Enter: ${name}`);
const result = thing.apply(self, args);
if (typeof result.then === 'function') {
result.then(
result => { self.logger.silly(`Promise Function Success: ${name}`); },
error => { self.logger.silly(`Promise Function Error: ${name}`, error); });
} else {
self.logger.verbose(`Promise Function Exit: ${name}: ${result}`);
}
return result;
};
} else if (typeof thing.then === 'function') {
this.logger.silly(`Promise Enter`);
thing.then(
result => { this.logger.silly(`Promise Success: ${result}`); },
error => { this.logger.silly(`Promise Error: ${result}`, error); });
return thing;
}
}
_delayFor(milliseconds) {
this._delayUntil(Date.now() + milliseconds);
}
}
module.exports = Crawler;
/* istanbul ignore next */
let call = Function.call;
/* istanbul ignore next */
function uncurryThis(f) {
return function () {
return call.apply(f, arguments);
@ -375,5 +374,6 @@ function uncurryThis(f) {
// http://jsperf.com/uncurrythis
/* istanbul ignore next */
let array_slice = uncurryThis(Array.prototype.slice);

Просмотреть файл

@ -111,6 +111,16 @@ class Request {
return this.flowControl === 'delay';
}
delayUntil(time) {
if (!this.nextRequestTime || this.nextRequestTime < time) {
this.nextRequestTime = time;
}
}
delayFor(milliseconds) {
this.delayUntil(Date.now() + milliseconds);
}
shouldRequeue() {
return this.processControl === 'requeue';
}

Просмотреть файл

@ -125,7 +125,6 @@ describe('Crawler get request', () => {
expect(abandoned.length).to.be.equal(0);
});
});
});
describe('Crawler fetch', () => {
@ -190,7 +189,39 @@ describe('Crawler fetch', () => {
return crawler._fetch(request).then(request => {
expect(request.document).to.be.undefined;
expect(request.shouldRequeue()).to.be.true;
expect(crawler.delayUntil > Date.now()).to.be.true;
expect(request.nextRequestTime > Date.now()).to.be.true;
});
});
it('should delay on backoff throttling', () => {
const request = new Request('foo', 'http://test');
const resetTime = Date.now() + 2000;
const responses = [createResponse('bar', 200, null, 30, resetTime)];
const requestor = createBaseRequestor({ get: () => { return Q(responses.shift()); } });
const store = createBaseStore({ etag: () => { return Q(null); } });
const crawler = createBaseCrawler({ requestor: requestor, store: store });
return crawler._fetch(request).then(request => {
expect(request.document).to.be.equal('bar');
expect(request.shouldRequeue()).to.be.false;
expect(request.shouldSkip()).to.be.false;
expect(request.nextRequestTime).to.be.equal(resetTime);
});
});
it('should delay on Retry-After throttling', () => {
const request = new Request('foo', 'http://test');
const resetTime = Date.now() + 3000;
const headers = { 'Retry-After': 3 };
const responses = [createResponse('bar', 200, null, 30, resetTime, headers)];
const requestor = createBaseRequestor({ get: () => { return Q(responses.shift()); } });
const store = createBaseStore({ etag: () => { return Q(null); } });
const crawler = createBaseCrawler({ requestor: requestor, store: store });
return crawler._fetch(request).then(request => {
expect(request.document).to.be.equal('bar');
expect(request.shouldRequeue()).to.be.false;
expect(request.shouldSkip()).to.be.false;
// give at most 100ms for the test to run
expect(request.nextRequestTime).to.be.within(resetTime, resetTime + 100);
});
});
@ -299,22 +330,23 @@ describe('Crawler filtering', () => {
it('should filter', () => {
const config = { orgFilter: new Set(['microsoft']) };
const crawler = createBaseCrawler({ options: config });
expect(crawler._shouldInclude('repo', 'http://api.github.com/repo/microsoft/test')).to.be.true;
expect(crawler._shouldInclude('repos', 'http://api.github.com/repos/microsoft/test')).to.be.true;
expect(crawler._shouldInclude('org', 'http://api.github.com/org/microsoft/test')).to.be.true;
expect(crawler._filter(new Request('repo', 'http://api.github.com/repo/microsoft/test')).shouldSkip()).to.be.false;
expect(crawler._filter(new Request('repos', 'http://api.github.com/repos/microsoft/test')).shouldSkip()).to.be.false;
expect(crawler._filter(new Request('org', 'http://api.github.com/org/microsoft/test')).shouldSkip()).to.be.false;
expect(crawler._shouldInclude('repo', 'http://api.github.com/repo/test/test')).to.be.false;
expect(crawler._shouldInclude('repos', 'http://api.github.com/repos/test/test')).to.be.false;
expect(crawler._shouldInclude('org', 'http://api.github.com/org/test/test')).to.be.false;
expect(crawler._filter(new Request('repo', 'http://api.github.com/repo/test/test')).shouldSkip()).to.be.true;
expect(crawler._filter(new Request('repos', 'http://api.github.com/repos/test/test')).shouldSkip()).to.be.true;
expect(crawler._filter(new Request('org', 'http://api.github.com/org/test/test')).shouldSkip()).to.be.true;
expect(crawler._shouldInclude('foo', 'http://api.github.com/blah/test/test')).to.be.true;
expect(crawler._filter(new Request('foo', 'http://api.github.com/org/test/test')).shouldSkip()).to.be.false;
});
it('should not filter if no config', () => {
const config = {};
const crawler = createBaseCrawler({ options: config });
expect(crawler._shouldInclude('repo', 'http://api.github.com/repo/microsoft/test')).to.be.true;
expect(crawler._shouldInclude('repo', 'http://api.github.com/repo/test/test')).to.be.true;
expect(crawler._filter(new Request('repo', 'http://api.github.com/repo/microsoft/test')).shouldSkip()).to.be.false;
expect(crawler._filter(new Request('repo', 'http://api.github.com/repo/test/test')).shouldSkip()).to.be.false;
expect(crawler._filter(new Request('foo', 'http://api.github.com/repo/test/test')).shouldSkip()).to.be.false;
});
});
@ -815,6 +847,45 @@ describe('Crawler store document', () => {
});
describe('Crawler whole meal deal', () => {
it('should delay starting next iteration when markDelay', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'start', () => Q());
const clock = sinon.useFakeTimers();
sinon.spy(clock, 'setTimeout');
const request = new Request('user', 'http://test.com/users/user1');
request.markDelay();
crawler._startNext('test', request);
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(1000);
});
it('should delay starting next iteration when delayUntil', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'start', () => Q());
const clock = sinon.useFakeTimers();
sinon.spy(clock, 'setTimeout');
const request = new Request('user', 'http://test.com/users/user1');
request.delayUntil(323);
crawler._startNext('test', request);
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(323);
});
it('should delay starting next iteration when delayFor', () => {
const crawler = createBaseCrawler();
sinon.stub(crawler, 'start', () => Q());
const clock = sinon.useFakeTimers();
sinon.spy(clock, 'setTimeout');
const request = new Request('user', 'http://test.com/users/user1');
request.delayFor(451);
crawler._startNext('test', request);
expect(clock.setTimeout.getCall(0).args[1]).to.be.equal(451);
});
it('should process normal requests', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
@ -1092,12 +1163,14 @@ function createFullCrawler() {
return result;
}
function createResponse(body, code = 200, etag = null) {
function createResponse(body, code = 200, etag = null, remaining = 4000, reset = 0, headers = {}) {
return {
statusCode: code,
headers: {
etag: etag
},
headers: Object.assign({
etag: etag,
'x-ratelimit-remaining': remaining,
'x-ratelimit-reset': reset ? reset : 0
}, headers),
body: body
};
}
@ -1138,7 +1211,7 @@ function createLinkHeader(target, previous, next, last) {
return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(',');
}
function createBaseCrawler({normal = createBaseQueue(), priority = createBaseQueue(), deadLetter = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = {}, logger = createBaseLog() } = {}) {
function createBaseCrawler({normal = createBaseQueue(), priority = createBaseQueue(), deadLetter = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = { promiseTrace: false }, logger = createBaseLog() } = {}) {
return new Crawler(normal, priority, deadLetter, store, locker, requestor, options, logger);
}
@ -1159,12 +1232,14 @@ function createBaseStore({etag = null, upsert = null, get = null} = {}) {
return result;
}
function createBaseLog({info = null, warn = null, error = null, verbose = null} = {}) {
function createBaseLog({info = null, warn = null, error = null, verbose = null, silly = null} = {}) {
const result = {};
result.info = info || (() => { });
result.warn = warn || (() => { });
result.error = error || (() => { });
result.verbose = verbose || ((message) => { console.log(message) });
result.silly = silly || ((message) => { console.log(message) });
result.level = 'silly';
return result;
}