Add support for reprocessing and update transitivity

This commit is contained in:
Jeff McAffer 2016-11-24 05:46:39 -08:00
Родитель 736895a20b
Коммит 93c6ba2a75
6 изменённых файлов: 409 добавлений и 63 удалений

Просмотреть файл

@ -215,7 +215,10 @@ class Crawler {
fetchType = 'page';
}
const self = this;
return this.store.etag(fetchType, request.url).then(etag => {
if (request.isReprocessing()) {
return this._reprocessingFetch(request, fetchType);
}
return this._getEtag(request, fetchType).then(etag => {
const options = etag ? { headers: { 'If-None-Match': etag } } : {};
const start = Date.now();
return self.requestor.get(request.url, options).then(githubResponse => {
@ -231,29 +234,20 @@ class Crawler {
const delay = 2 * 60 * 1000;
request.delayFor(delay);
request.addMeta({ forbiddenDelay: delay });
return request.markRequeue('GitHub throttled: ${request.url}');
return request.markRequeue(`GitHub throttled: ${request.url}`);
}
throw new Error(`Code: ${status} for: ${request.url}`);
}
request.response = githubResponse;
self._checkGitHubRateLimit(request, githubResponse);
if (status === 304) {
// We have the content for this element. If we are forcing, get the content from the
// store and process. Otherwise, skip.
if (!request.force) {
return request.markSkip('Unmodified');
if (request.isForced()) {
return self._fetchFromStore(request, fetchType);
}
return self.store.get(fetchType, request.url).then(document => {
// if the doc had stored headers (e.g., page responses) then reconstitute them for processing
if (document._metadata && document._metadata.headers) {
Object.assign(githubResponse.headers, document._metadata.headers);
}
request.document = document;
request.response = githubResponse;
// Our store is up to date so don't store
request.store = false;
return request;
});
return request.markSkip('Unmodified');
}
request.document = githubResponse.body;
request.response = githubResponse;
@ -262,6 +256,26 @@ class Crawler {
});
}
_getEtag(request, fetchType) {
if (request.isForcedFetch()) {
return Q(null);
}
return this.store.etag(fetchType, request.url);
}
_fetchFromStore(request, fetchType) {
return this.store.get(fetchType, request.url).then(document => {
// if the doc had stored headers (e.g., page responses) then reconstitute them for processing
if (document._metadata && document._metadata.headers) {
Object.assign(request.response.headers, document._metadata.headers);
}
request.document = document;
// Our store is up to date so don't store
request.store = false;
return request;
});
}
_convertToDocument(request) {
if (request.shouldSkip()) {
return Q(request);
@ -350,7 +364,8 @@ class Crawler {
// Create a new request data structure that has just the things we should queue
const queuable = new Request(request.type, request.url, request.context);
queuable.attemptCount = request.attemptCount;
queuable.force = request.force;
queuable.transitivity = request.transitivity;
queuable.fetch = request.fetch;
queuable.subType = request.subType;
return queuable;
}

Просмотреть файл

@ -5,6 +5,7 @@ const URL = require('url');
class Processor {
constructor() {
this.version = 1;
}
process(request) {
@ -15,24 +16,36 @@ class Processor {
return request.document;
}
if (request.isReprocessing()) {
if (request.document._metadata.version === this.version) {
request.markSkip('Verbose', `Document for ${request.url} already at version: ${this.version}`);
return request.document;
}
if (request.document._metadata.version > this.version) {
request.markSkip('Warn', `Document version: ${request.document._metadata.version} is > requested version: ${this.version} for ${request.url}`);
return request.document;
}
}
const result = handler.call(this, request);
result._metadata.version = 1;
result._metadata.version = this.version;
return result;
}
collection(request) {
// if there are additional pages, queue them up to be processed. Note that these go
// on the high priority queue so they are loaded before they change much.
const linkHeader = request.response.headers.link;
const linkHeader = (request.response && request.response.headers) ? request.response.headers.link : null;
if (linkHeader) {
const links = parse(linkHeader);
const requests = [];
for (let i = 2; i <= links.last.page; i++) {
const url = request.url + `?page=${i}&per_page=100`;
const newRequest = new Request('page', url);
newRequest.force = request.force;
newRequest.subType = request.subType;
// Carry through this request's transitivity, fetch and qualifier for subsequent pages
newRequest.fetch = request.fetch;
newRequest.transitivity = request.transitivity;
newRequest.context = { qualifier: request.context.qualifier };
newRequest.subType = request.subType;
requests.push(newRequest);
}
// TODO this is a bit reachy. need a better way to efficiently queue up
@ -55,7 +68,7 @@ class Processor {
}
request.linkSelf('self', `${qualifier}:${type}:pages:${page}`);
document.elements.forEach(item => {
request.queueChild(type, item.url, qualifier);
request.queueCollectionElement(type, item.url, qualifier);
});
return document;
}
@ -63,19 +76,17 @@ class Processor {
org(request) {
const document = request.document;
request.addRootSelfLink();
request.linkSiblings('repos', `urn:org:${document.id}:repos`);
request.linkSiblings('repos', `urn:login:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:org');
// Queue this org's repos and force if we are being forced.
request.queueRoots('repos', document.repos_url, request.force);
// TODO is this "logins"
request.queueRoots('users', document.members_url.replace('{/member}', ''), request.force);
request.queueRoots('repos', document.repos_url);
request.queueRoots('users', document.members_url.replace('{/member}', ''));
return document;
}
user(request) {
const document = request.document;
request.addRootSelfLink();
request.linkSiblings('repos', `urn:user:${document.id}:repos`);
request.linkSiblings('repos', `urn:login:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:user');
request.queueRoots('repos', document.repos_url);
return document;
@ -166,7 +177,7 @@ class Processor {
request.addSelfLink();
request.linkSelf('user', `urn:login:${document.user.id}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:issue:${context.issue}:comments`);
request.queue('login', document.user.url);
request.queueRoot('login', document.user.url);
return document;
}

Просмотреть файл

@ -1,9 +1,30 @@
const extend = require('extend');
/**
Requests describe a resource to capture and process as well as the context for that processing.
Transitivity
* none - Only process this exact resource
* normal - Process this resource if not previously seen and do normal processing on non-roots and roots
* forceNone - Process this resource and force processing on non-roots and no processing of roots
* forceNormal - Force processing of children plus normal processing of roots
* forceForce - Force processing of children and roots. Decays to forceNormal on roots
Basically, once you are forcing, force transitivity for all children, but still allow control over transitivity
when traversing to a root. When traversing with forceForce, queued roots end up as forceNormal. Similarly,
when traversing with forceNormal, queued roots end up as normal.
Fetch behavior
* none - Only use existing content. Skip this resource if we don't already have it
* normal - Use existing content if we have it and it matches. Otherwise, get content from original source
* force - Ignore exiting content and get contenf from original source
*/
class Request {
constructor(type, url, context = null) {
this.type = type;
this.url = url;
this.transitivity = 'normal';
this.fetch = 'normal';
this.context = context || {};
this.promises = [];
}
@ -30,7 +51,7 @@ class Request {
addSelfLink(key = 'id', base = null) {
let qualifier = base ? base : this.context.qualifier;
if (!qualifier || (typeof qualifier !== 'string' )) {
if (!qualifier || (typeof qualifier !== 'string')) {
console.log('bummer');
}
qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':';
@ -55,39 +76,83 @@ class Request {
this.track(this.crawler.queue(newRequest));
}
queueRoot(type, url, force = false) {
queueRoot(type, url) {
const transitivity = this._getRootTransitivity();
if (!transitivity) {
return;
}
const newRequest = new Request(type, url);
newRequest.context = { qualifier: 'urn:' };
newRequest.force = force;
// set the new request's transitivity to the next value
newRequest.transitivity = transitivity;
this.track(this.crawler.queue(newRequest));
}
queueRoots(type, url, force = false) {
queueRoots(type, url) {
const transitivity = this._getRootTransitivity();
if (!transitivity) {
return;
}
const newRequest = new Request(type, url);
const newContext = {};
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
newRequest.force = force;
newRequest.context = { qualifier: this.document._metadata.links.self.href };
// carry over this requests transitivity as we are queuing a collection
newRequest.transitivity = this.transitivity;
this.track(this.crawler.queue(newRequest));
}
queueCollectionElement(type, url, qualifier) {
if (this.isRootType(type)) {
return this.queueRoot(type, url);
}
return this.queueChild(type, url, qualifier);
}
queueChild(type, url, qualifier) {
const transitivity = this._getChildTransitivity();
if (!transitivity) {
return;
}
const newRequest = new Request(type, url);
newRequest.context = this.context || {};
newRequest.context.qualifier = qualifier;
newRequest.force = this.force;
newRequest.transitivity = transitivity;
this.track(this.crawler.queue(newRequest));
}
queueChildren(type, url, context = null) {
const transitivity = this._getChildTransitivity();
if (!transitivity) {
return;
}
const newRequest = new Request(type, url);
const newContext = extend(this.context || {}, context);
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
newRequest.force = this.force;
// carry over this requests transitivity as we are queuing a collection
newRequest.transitivity = this.transitivity;
this.track(this.crawler.queue(newRequest));
}
_getRootTransitivity() {
return { normal: 'normal', forceNormal: 'normal', forceForce: 'forceNormal' }[this.transitivity];
}
_getChildTransitivity() {
return { normal: 'normal', forceNone: 'forceNone', forceNormal: 'forceNormal', forceForce: 'forceNormal' }[this.transitivity];
}
isReprocessing() {
return this.fetch === 'none';
}
isForced() {
return this.transitivity.startsWith('force');
}
isForcedFetch() {
return this.fetch === 'force';
}
markSkip(outcome, message) {
if (this.shouldSkip()) {
return this;
@ -159,6 +224,11 @@ class Request {
};
return collections[this.type];
}
isRootType(type) {
const roots = new Set(['orgs', 'org', 'repos', 'repo', 'teams', 'team', 'users', 'user']);
return roots.has(type);
}
}
module.exports = Request;

Просмотреть файл

@ -249,7 +249,7 @@ describe('Crawler fetch', () => {
it('should return cached content and not save and response for 304 with force', () => {
const url = 'http://test';
const request = new Request('repos', url);
request.force = true;
request.transitivity = 'forceNormal';
let getArgs = null;
const responses = [createResponse(null, 304, 42)];
const requestor = createBaseRequestor({
@ -270,7 +270,7 @@ describe('Crawler fetch', () => {
it('should return cached content and headers for 304 with force', () => {
const url = 'http://test';
const request = new Request('repos', url);
request.force = true;
request.transitivity = 'forceNormal';
let getArgs = null;
const responses = [createResponse(null, 304, 42)];
const requestor = createBaseRequestor({
@ -297,7 +297,6 @@ describe('Crawler fetch', () => {
const crawler = createBaseCrawler({ requestor: requestor, store: store });
return crawler._fetch(request).then(request => {
expect(request.document).to.be.undefined;
expect(request.response).to.be.undefined;
expect(request.shouldSkip()).to.be.true;
});
});
@ -343,7 +342,7 @@ describe('Crawler fetch', () => {
it('should throw for store get errors', () => {
const request = new Request('repos', 'http://test');
request.force = true;
request.transitivity = 'forceNormal';
const responses = [createResponse(null, 304, 42)];
const requestor = createBaseRequestor({ get: () => { return Q(responses.shift()); } });
const store = createBaseStore({ etag: () => { return Q(42); }, get: () => { throw new Error('test'); } });
@ -1265,33 +1264,12 @@ function create304Response(etag) {
};
}
function createMultiPageResponse(target, body, previous, next, last, code = 200, error = null, remaining = 4000, reset = null) {
return {
headers: {
'x-ratelimit-remaining': remaining,
'x-ratelimit-reset': reset ? reset : 0,
link: createLinkHeader(target, previous, next, last)
},
statusCode: code,
body: body
};
}
function createErrorResponse(error) {
return {
error: new Error(error)
};
}
function createLinkHeader(target, previous, next, last) {
separator = target.includes('?') ? '&' : '?';
const firstLink = null; //`<${urlHost}/${target}${separator}page=1>; rel="first"`;
const prevLink = previous ? `<${urlHost}/${target}${separator}page=${previous}>; rel="prev"` : null;
const nextLink = next ? `<${urlHost}/${target}${separator}page=${next}>; rel="next"` : null;
const lastLink = last ? `<${urlHost}/${target}${separator}page=${last}>; rel="last"` : null;
return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(',');
}
function createBaseCrawler({queues = createBaseQueues(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = { promiseTrace: false }, logger = createBaseLog() } = {}) {
return new Crawler(queues, store, locker, requestor, options, logger);
}

163
test/processorTests.js Normal file
Просмотреть файл

@ -0,0 +1,163 @@
const assert = require('chai').assert;
const chai = require('chai');
const expect = require('chai').expect;
const Processor = require('../lib/processor.js');
const Request = require('../lib/request.js');
const sinon = require('sinon');
describe('Processor reprocessing', () => {
it('will skip if at same version', () => {
const processor = new Processor();
const request = new Request('user', 'http://test.com/users/user1');
request.fetch = 'none';
request.document = { _metadata: { version: processor.version } };
sinon.stub(processor, 'user', () => { });
processor.process(request);
expect(request.shouldSkip()).to.be.true;
expect(processor.user.callCount).to.be.equal(0);
});
it('will skip and warn if at greater version', () => {
const processor = new Processor();
const request = new Request('user', 'http://test.com/users/user1');
request.fetch = 'none';
request.document = { _metadata: { version: processor.version + 1 } };
sinon.stub(processor, 'user', () => { });
processor.process(request);
expect(request.shouldSkip()).to.be.true;
expect(request.outcome).to.be.equal('Warn');
expect(processor.user.callCount).to.be.equal(0);
});
it('will process and update if at lesser version', () => {
const processor = new Processor();
const request = new Request('user', 'http://test.com/users/user1');
request.fetch = 'none';
request.document = { _metadata: { version: processor.version - 1 } };
sinon.stub(processor, 'user', () => { return request.document; });
const document = processor.process(request);
expect(request.shouldSkip()).to.be.false;
expect(processor.user.callCount).to.be.equal(1);
expect(document._metadata.version).to.be.equal(processor.version);
});
});
describe('Collection processing', () => {
it('should queue forceNormal normal collection pages as forceNormal and elements as forceNormal', () => {
const request = new Request('issues', 'http://test.com/issues');
request.transitivity = 'forceNormal';
request.subType = 'issue';
request.response = {
headers: { link: createLinkHeader(request.url, null, 2, 2) }
};
request.document = { _metadata: { links: {} }, elements: [{ type: 'issue', url: 'http://child1' }] };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const processor = new Processor();
processor.collection(request);
expect(request.crawler.queues.pushPriority.callCount).to.be.equal(1);
const newPages = request.crawler.queues.pushPriority.getCall(0).args[0];
expect(newPages.length).to.be.equal(1);
expect(newPages[0].transitivity).to.be.equal('forceNormal');
expect(newPages[0].url).to.be.equal('http://test.com/issues?page=2&per_page=100');
expect(newPages[0].type).to.be.equal('page');
expect(newPages[0].subType).to.be.equal('issue');
expect(request.crawler.queue.callCount).to.be.equal(1);
const newRequest = request.crawler.queue.getCall(0).args[0];
expect(newRequest.transitivity).to.be.equal('forceNormal');
expect(newRequest.url).to.be.equal('http://child1');
expect(newRequest.type).to.be.equal('issue');
});
it('should queue forceNormal root collection pages as forceNormal and elements as normal', () => {
const request = new Request('collection', 'http://test.com/orgs');
request.transitivity = 'forceNormal';
request.subType = 'org';
request.response = {
headers: { link: createLinkHeader(request.url, null, 2, 2) }
};
request.document = { _metadata: { links: {} }, elements: [{ type: 'org', url: 'http://child1' }] };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const processor = new Processor();
processor.collection(request);
expect(request.crawler.queues.pushPriority.callCount).to.be.equal(1);
const newPages = request.crawler.queues.pushPriority.getCall(0).args[0];
expect(newPages.length).to.be.equal(1);
expect(newPages[0].transitivity).to.be.equal('forceNormal');
expect(newPages[0].url).to.be.equal('http://test.com/orgs?page=2&per_page=100');
expect(newPages[0].type).to.be.equal('page');
expect(newPages[0].subType).to.be.equal('org');
expect(request.crawler.queue.callCount).to.be.equal(1);
const newRequest = request.crawler.queue.getCall(0).args[0];
expect(newRequest.transitivity).to.be.equal('normal');
expect(newRequest.url).to.be.equal('http://child1');
expect(newRequest.type).to.be.equal('org');
});
it('should queue forceForce root collection pages as forceForce and elements as forceNormal', () => {
const request = new Request('collection', 'http://test.com/orgs');
request.transitivity = 'forceForce';
request.subType = 'org';
request.response = {
headers: { link: createLinkHeader(request.url, null, 2, 2) }
};
request.document = { _metadata: { links: {} }, elements: [{ type: 'org', url: 'http://child1' }] };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const processor = new Processor();
processor.collection(request);
expect(request.crawler.queues.pushPriority.callCount).to.be.equal(1);
const newPages = request.crawler.queues.pushPriority.getCall(0).args[0];
expect(newPages.length).to.be.equal(1);
expect(newPages[0].transitivity).to.be.equal('forceForce');
expect(newPages[0].url).to.be.equal('http://test.com/orgs?page=2&per_page=100');
expect(newPages[0].type).to.be.equal('page');
expect(newPages[0].subType).to.be.equal('org');
expect(request.crawler.queue.callCount).to.be.equal(1);
const newRequest = request.crawler.queue.getCall(0).args[0];
expect(newRequest.transitivity).to.be.equal('forceNormal');
expect(newRequest.url).to.be.equal('http://child1');
expect(newRequest.type).to.be.equal('org');
});
it('should queue forceForce page elements with forceNormal transitivity', () => {
const request = new Request('page', 'http://test.com/orgs?page=2&per_page=100');
request.transitivity = 'forceForce';
request.subType = 'org';
request.document = { _metadata: { links: {} }, elements: [{ url: 'http://child1' }] };
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
const processor = new Processor();
processor.page(request);
expect(request.crawler.queue.callCount).to.be.equal(1);
const newRequest = request.crawler.queue.getCall(0).args[0];
expect(newRequest.transitivity).to.be.equal('forceNormal');
expect(newRequest.url).to.be.equal('http://child1');
expect(newRequest.type).to.be.equal('org');
});
});
function createLinkHeader(target, previous, next, last) {
separator = target.includes('?') ? '&' : '?';
const firstLink = null; //`<${urlHost}/${target}${separator}page=1>; rel="first"`;
const prevLink = previous ? `<${target}${separator}page=${previous}>; rel="prev"` : null;
const nextLink = next ? `<${target}${separator}page=${next}>; rel="next"` : null;
const lastLink = last ? `<${target}${separator}page=${last}>; rel="last"` : null;
return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(',');
}

109
test/requestTests.js Normal file
Просмотреть файл

@ -0,0 +1,109 @@
const assert = require('chai').assert;
const chai = require('chai');
const expect = require('chai').expect;
const Request = require('../lib/request.js');
const sinon = require('sinon');
describe('Request transitivity', () => {
it('will not queueRoot if none transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'none';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoot('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will queueRoot normal if normal transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'normal';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoot('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('normal');
});
it('will not queueRoot if forceNone transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceNone';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoot('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will queueRoot normal if forceNormal transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceNormal';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoot('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('normal');
});
it('will queueRoot forceNormal if forceForce transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceForce';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoot('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('forceNormal');
});
it('will not queueChild if none transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'none';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChild('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will queueChild normal if normal transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'normal';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChild('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('normal');
});
it('will queueChild force if force transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceNone';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChild('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('forceNone');
});
it('will queueChild foceNormal if forceNormal transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceNormal';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChild('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('forceNormal');
});
it('will queueChild foceNormal if forceForce transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceForce';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChild('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('forceNormal');
});
});
describe('Request context/qualifier', () => {
it('will not queueRoot if none transitivity', () => {
});
});