Merge pull request #8 from Microsoft/jm/refactorCrawler

Break crawler into three parts
This commit is contained in:
Jeff McAffer 2016-11-11 11:13:48 -08:00 коммит произвёл GitHub
Родитель 1445cf8c86 b5df9f166b
Коммит 3c64e42c78
4 изменённых файлов: 538 добавлений и 455 удалений

Просмотреть файл

@ -1,3 +1,5 @@
module.exports.eventFinder = require('./lib/eventFinder');
module.exports.webhookDriver = require('./lib/webhookDriver');
module.exports.crawler = require('./lib/crawler');
module.exports.request = require('./lib/request');
module.exports.processor = require('./lib/processor');

Просмотреть файл

@ -1,47 +1,84 @@
const extend = require('extend');
const moment = require('moment');
const parse = require('parse-link-header');
const Processor = require('./processor');
const Q = require('q');
const URL = require('url');
const collections = {
orgs: 'org', repos: 'repo', issues: 'issue', issue_comments: 'issue_comment', commits: 'commit', teams: 'team', users: 'user'
};
class Crawler {
constructor(queue, priorityQueue, store, requestor, config, logger) {
this.queue = queue;
this.normalQueue = queue;
this.priorityQueue = priorityQueue;
this.store = store;
this.requestor = requestor;
this.config = config;
this.logger = logger;
this.processor = new Processor();
}
start() {
return this._pop(this.priorityQueue)
.then(this._pop.bind(this, this.queue))
.then(this._trackStart.bind(this))
let requestBox = [];
return this._getRequest(requestBox)
.then(this._filter.bind(this))
.then(this._fetch.bind(this))
.then(this._convertToDocument.bind(this))
.then(this._processDocument.bind(this))
.then(this._storeDocument.bind(this))
.then(this._deleteFromQueue.bind(this))
.then(this._logOutcome.bind(this))
.catch(this._requeueHandler.bind(this, requestBox))
.then(this._completeRequest.bind(this))
.then(this._startNext.bind(this))
.catch(error => {
this.logger.log('error', `${error.message}`);
this.logger.log('Error', error);
});
}
_pop(queue, request = null) {
return request ? Q(request) : queue.pop();
_requeueHandler(requestBox, error) {
return requestBox[0].markRequeue(error);
}
_trackStart(request) {
request.start = Date.now();
return Q(request);
_getRequest(requestBox) {
return this._pop(this.priorityQueue)
.then(this._pop.bind(this, this.normalQueue))
.then(request => {
if (!request) {
request = new request('wait', null);
request.delay = 1000;
request.markSkip('Exhausted queue', `Waiting ${request.delay}ms`);
}
request.start = Date.now();
requestBox[0] = request;
return request;
});
}
_completeRequest(request) {
if (request.shouldRequeue()) {
this.logger.log('Info', `Requeuing request ${request.type} for ${request.url}`);
this.queue(request, request);
}
return this._deleteFromQueue(request)
.then(this._wait.bind(this))
.then(this._logOutcome.bind(this));
}
_wait(request) {
if (!request.delay) {
return Q(request);
}
const result = Q.defer();
setTimeout(() => { result.resolve(request); }, request.delay);
return result;
}
_pop(queue, request = null) {
const self = this;
return (request ? Q(request) : queue.pop()).then(result => {
if (result) {
result.crawler = self;
result.originQueue = queue;
}
return result;
});
}
_startNext() {
@ -50,19 +87,19 @@ class Crawler {
_filter(request) {
if (this._configFilter(request.type, request.url)) {
this._markSkip(request, 'Filtered');
request.markSkip('Filtered');
}
return Q.resolve(request);
}
_fetch(request) {
if (request.skip) {
if (request.shouldSkip()) {
return Q.resolve(request);
}
// rewrite the request type for collections remember the collection subType
// Also setup 'page' as the document type to look up for etags etc.
let fetchType = request.type;
let subType = collections[request.type];
let subType = request.getCollectionType();
if (subType) {
request.type = 'collection';
request.subType = subType;
@ -74,17 +111,19 @@ class Crawler {
const start = Date.now();
return self.requestor.get(request.url, options).then(githubResponse => {
const status = githubResponse.statusCode;
this._addMeta(request, { status: status, fetch: Date.now() - start });
request.addMeta({ status: status, fetch: Date.now() - start });
if (status !== 200 && status !== 304) {
self._markSkip(request, 'Error', new Error(`Code: ${status} for: ${request.url}`));
return request;
if (status === 409) {
return request.markSkip('Error', new Error(`Code: ${status} for: ${request.url}`));
}
return request.markRequeue();
}
if (status === 304 && githubResponse.headers.etag === etag) {
// We have the content for this element. If it is immutable, skip.
// Otherwise get it from the store and process.
if (!request.force) {
return self._markSkip(request, 'Unmodified');
return request.markSkip('Unmodified');
}
return self.store.get(fetchType, request.url).then(document => {
request.document = document;
@ -100,12 +139,12 @@ class Crawler {
});
}).catch(error => {
// TODO can this request be requeued?
return this._markSkip(request, 'Error', error);
return request.markSkip('Error', error);
});
}
_convertToDocument(request) {
if (request.skip) {
if (request.shouldSkip()) {
return Q.resolve(request);
}
@ -125,22 +164,23 @@ class Crawler {
}
_processDocument(request) {
if (request.skip) {
if (request.shouldSkip()) {
return Q.resolve(request);
}
const handler = this[request.type];
let handler = this.processor[request.type];
handler = handler || this[request.type];
if (!handler) {
this._markSkip(request, 'Warning', `No handler found for request type: ${request.type}`);
request.markSkip('Warning', `No handler found for request type: ${request.type}`);
return request;
}
request.document = handler.call(this, request);
request.document = handler.call(this.processor, request);
return Q.resolve(request);
}
_storeDocument(request) {
// See if we should skip storing the document. Test request.store explicitly for false as it may just not be set.
if (request.skip || !this.store || !request.document || request.store === false) {
if (request.shouldSkip() || !this.store || !request.document || request.store === false) {
return Q.resolve(request);
}
@ -154,7 +194,7 @@ class Crawler {
if (!request.message) {
return Q.resolve(request);
}
return this.queue.done(request).then(() => { return request; });
return this.normalQueue.done(request).then(() => { return request; });
}
_logOutcome(request) {
@ -162,416 +202,23 @@ class Crawler {
if (outcome === 'Error') {
this.logger.log(outcome, request.message);
} else {
this._addMeta(request, { total: Date.now() - request.start });
request.addMeta({ total: Date.now() - request.start });
this.logger.log('info', `${outcome} ${request.type} [${request.url}] ${request.message || ''}`, request.meta);
}
return request;
}
_addMeta(request, data) {
request.meta = extend({}, request.meta, data);
return request;
}
// =============== Entity Processors ============
collection(request) {
// if there are additional pages, queue them up to be processed. Note that these go
// on the high priority queue so they are loaded before they change much.
const linkHeader = request.response.headers.link;
if (linkHeader) {
const links = parse(linkHeader);
for (let i = 2; i <= links.last.page; i++) {
const url = request.url + `?page=${i}&per_page=100`;
const context = { qualifier: request.context.qualifier };
this._queueBase(request, { type: 'page', url: url, subType: request.subType, page: i, force: request.force, context: context }, this.priorityQueue);
}
}
// Rewrite the request and document to be a 'page' and then process.
request.page = 1;
request.document._metadata.type = 'page';
return this.page(request);
}
page(request) {
const document = request.document;
const type = request.subType;
const first = document.elements[0];
const qualifier = request.context.qualifier;
this._linkSelf(request, 'self', `${qualifier}:${type}:pages:${request.page}`);
document.elements.forEach(item => {
this._queueChild(request, type, item.url, qualifier);
});
return document;
}
org(request) {
const document = request.document;
this._addSelfLink(request, 'urn:');
this._linkSiblings(request, 'repos', `urn:org:${document.id}:repos`);
this._linkSiblings(request, 'siblings', 'urn:org');
this._queueChildren(request, 'repos', document.repos_url);
// TODO is this "logins"
this._queueChildren(request, 'users', document.members_url.replace('{/member}', ''));
return document;
}
user(request) {
const document = request.document;
this._addSelfLink(request, 'urn:');
this._linkSiblings(request, 'repos', `urn:user:${document.id}:repos`);
this._linkSiblings(request, 'siblings', 'urn:user');
this._queueChildren(request, 'repos', document.repos_url);
return document;
}
repo(request) {
const document = request.document;
this._addSelfLink(request, 'urn:');
this._linkSelf(request, 'owner', `urn:login:${document.owner.id}`);
this._linkSelf(request, 'parent', `urn:login:${document.owner.id}`);
this._linkSiblings(request, 'siblings', `urn:login:${document.owner.id}:repos`);
this._queueRoot(request, 'login', document.owner.url);
this._queueChildren(request, 'issues', document.issues_url.replace('{/number}', ''), { repo: document.id });
this._queueChildren(request, 'commits', document.commits_url.replace('{/sha}', ''), { repo: document.id });
return document;
}
commit(request) {
const document = request.document;
const context = request.context;
this._addSelfLink(request, null, 'sha');
this._linkSelf(request, 'repo', `urn:repo:${context.repo}`);
this._linkSiblings(request, 'siblings', `urn:repo:${context.repo}:commits`);
// TODO not sure what the following line does
// document._metadata.links.parent = document._metadata.links.parent;
if (document.author) {
this._linkSelf(request, 'author', `urn:login:${document.author.id}`);
this._queueRoot(request, 'login', document.author.url);
}
if (document.committer) {
this._linkSelf(request, 'committer', `urn:login:${document.committer.id}`);
this._queueRoot(request, 'login', document.committer.url);
}
if (document.files) {
document.files.forEach(file => {
delete file.patch;
});
}
return document;
}
login(request) {
const document = request.document;
this._addSelfLink(request, 'urn:');
this._linkSelf(request, 'self', `urn:login:${document.id}`);
// TODO should we do repos here and in the user/org?
this._linkSiblings(request, 'repos', `urn:login:${document.id}:repos`);
this._linkSiblings(request, 'siblings', 'urn:login');
if (document.type === 'Organization') {
this._queueRoot(request, 'org', `https://api.github.com/orgs/${document.login}`);
} else if (document.type === 'User') {
this._queueRoot(request, 'user', `https://api.github.com/users/${document.login}`);
}
this._queueChildren(request, 'repos', document.repos_url);
return document;
}
issue(request) {
const document = request.document;
const context = request.context;
this._addSelfLink(request);
this._linkSelf(request, 'assignees', document.assignees.map(assignee => { return `urn:login:${assignee.id}`; }));
this._linkSelf(request, 'repo', `urn:repo:${context.repo}`);
this._linkSelf(request, 'parent', `urn:repo:${context.repo}`);
this._linkSelf(request, 'user', `urn:login:${document.user.id}`);
this._linkSiblings(request, 'siblings', `urn:repo:${context.repo}:issues`);
this._queueRoot(request, 'login', document.user.url);
if (document.assignee) {
this._linkSelf(request, 'assignee', `urn:login:${document.assignee.id}`);
this._queueRoot(request, 'login', document.assignee.url);
}
if (document.closed_by) {
this._linkSelf(request, 'closed_by', `urn:login:${document.closed_by.id}`);
this._queueRoot(request, 'login', document.closed_by.url);
}
// milestone
// pull request
// events
// labels
this._queueChildren(request, 'issue_comments', document.comments_url, { issue: document.id, repo: context.repo });
return document;
}
issue_comment(request) {
const document = request.document;
const context = request.context;
this._addSelfLink(request);
this._linkSelf(request, 'user', `urn:login:${document.user.id}`);
this._linkSiblings(request, 'siblings', `urn:repo:${context.repo}:issue:${context.issue}:comments`);
this._queue(request, 'login', document.user.url);
return document;
}
team(request) {
const document = request.document;
this._addSelfLink(request, `urn:org:${document.organization.id}`);
this._linkSelf(request, 'org', `urn:org:${document.organization.id}`);
this._linkSelf(request, 'login', `urn:login:${document.organization.id}`);
this._linkSiblings(request, 'siblings', `urn:org:${document.organization.id}:teams`);
this._queueChildren(request, 'team_members', document.members_url);
this._queueChildren(request, 'team_repos', document.repositories_url);
return document;
}
team_members(request) {
const document = request.document;
this._addSelfLink(request, `urn:org:${document.organization.id}`);
return document;
}
team_repos(request) {
this._addSelfLink(request, `urn:org:${document.organization.id}`);
return document;
}
// =============== Event Processors ============
CommitCommentEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(request);
this._linkSelf(request, 'comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`);
// TODO siblings?
this._queue(request, 'comment', payload.comment.url);
return document;
}
CreateEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
return document;
}
DeleteEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
// TODO do something for interesting deletions e.g., where ref-type === 'repository'
return document;
}
DeploymentEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`);
this._queue(request, 'deployment', payload.deployment.url);
return document;
}
DeploymentStatusEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'deployment_status', `urn:repo:${context.repo}:deployment:${payload.deployment.id}:status:${payload.deployment_status.id}`);
this._linkSelf(request, 'deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`);
this._queue(request, 'deployment', payload.deployment.url);
return document;
}
ForkEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
// TODO figure out what else to do
return document;
}
GollumEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
return document;
}
IssueCommentEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`);
this._linkSelf(request, 'comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`);
this._queue(request, 'comment', payload.comment.url);
this._queue(request, 'issue', payload.issue.url);
return document;
}
IssuesEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`);
this._queue(request, 'issue', payload.issue.url);
return document;
}
LabelEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
return document;
}
MemberEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'member', `urn:login:${payload.member.id}`);
this._queueRoot(request, 'login', payload.member.url);
return document;
}
MembershipEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'self', `urn:team:${payload.team.id}:membership_event:${document.id}`);
this._linkSelf(request, 'member', `urn:login:${payload.member.id}`);
this._linkSelf(request, 'team', `urn:team:${payload.team.id}`);
this._linkSelf(request, 'org', `urn:org:${payload.organization.id}`);
this._queueRoot(request, 'login', payload.member.url);
this._queueRoot(request, 'org', payload.organization.url);
this._queue(request, 'team', payload.team.url);
return document;
}
MilestoneEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'milestone', `urn:repo:${context.repo}:milestone:${payload.milestone.id}`);
this._queue(request, 'milestone', payload.milestone.url);
return document;
}
PageBuildEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'page_build', `urn:repo:${context.repo}:page_builds:${payload.id}`);
this._queue(request, 'page_build', payload.build.url);
return document;
}
PublicEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
return document;
}
PullRequestEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`);
this._queue(request, 'pull', payload.pull_request.url);
return document;
}
PullRequestReviewEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'review', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:review:${payload.review.id}`);
this._linkSelf(request, 'pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`);
this._queue(request, 'pull_review', payload.pull_request.review_comment_url.replace('{/number}', `/${payload.review.id}`));
this._queue(request, 'pull', payload.pull_request.url);
return document;
}
PullRequestReviewCommentEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
this._linkSelf(request, 'comment', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:comment:${payload.comment.id}`);
this._linkSelf(request, 'pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`);
// TODO see if all the various comments can be the same type
this._queue(request, 'pull_comment', payload.comment.url);
this._queue(request, 'pull', payload.pull_request.url);
return document;
}
PushEvent(request) {
const document = request.document;
const context = request.context;
const payload = this._eventHelper(document);
// TODO figure out what to do with the commits
return document;
}
// =============== Helpers ============
_addSelfLink(request, base = null, key = 'id') {
let qualifier = base ? base : request.context.qualifier;
qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':';
this._linkSelf(request, 'self', `${qualifier}${request.type}:${request.document[key]}`);
}
_linkSelf(request, name, value) {
const links = request.document._metadata.links;
const key = Array.isArray(value) ? 'hrefs' : 'href';
links[name] = { type: 'self' };
links[name][key] = value;
}
_linkSiblings(request, name, href) {
const links = request.document._metadata.links;
links[name] = { type: 'siblings', href: href };
}
_queue(request, type, url, context, queue = null) {
const newRequest = { type: type, url: url };
newRequest.context = context;
this._queueBase(request, newRequest, queue);
}
_queueRoot(request, type, url) {
this._queueBase(request, { type: type, url: url });
}
_queueChild(request, type, url, qualifier) {
const newRequest = { type: type, url: url };
newRequest.context = request.context || {};
newRequest.context.qualifier = qualifier;
if (request.force) {
newRequest.force = request.force;
}
this._queueBase(request, newRequest);
}
_queueChildren(request, type, url, context = null) {
const newRequest = { type: type, url: url };
const newContext = extend(request.context || {}, context);
newRequest.context = newContext;
newContext.qualifier = request.document._metadata.links.self.href;
if (request.force) {
newRequest.force = request.force;
}
this._queueBase(request, newRequest);
}
// TODO make a queue all and add promises (then) to the code below
_queueBase(request, newRequest, queue = null) {
queue(request, newRequest, queue = null) {
if (this._configFilter(newRequest.type, newRequest.url)) {
this.logger.log('info', `Skipped queuing ${newRequest.type} [${newRequest.url}]`);
return;
}
queue = queue || this.queue;
queue = queue || this.normalQueue;
request.promises.push(queue.push(newRequest));
return request;
}
_configFilter(type, target) {
@ -585,32 +232,6 @@ class Crawler {
}
return false;
}
_markSkip(request, outcome, message) {
request.skip = true;
request.outcome = request.outcome || outcome;
request.message = request.message || message;
return request;
}
_eventHelper(request, references) {
const document = request.document;
// TODO understand if the actor is typically the same as the creator or pusher in the payload
const repo = document.repo ? document.repo.id : null;
const urn = repo ? `urn:repo:${repo}` : `urn:org:${document.org.id}`;
this._linkSelf(request, 'self', `${urn}:${request.type}:${document.id}`);
this._linkSelf(request, 'actor', `urn:login:${document.actor.id}`);
this._linkSelf(request, 'repo', `urn:repo:${document.repo.id}`);
this._linkSelf(request, 'org', `urn:org:${document.org.id}`);
this._queueRoot(request, 'login', document.actor.url);
this._queueRoot(request, 'repo', document.repo.url);
this._queueRoot(request, 'org', document.org.url);
return document.payload;
}
_isCollectionRequest(request) {
return collections.hasOwnProperty(request.type);
}
}
module.exports = Crawler;

351
lib/processor.js Normal file
Просмотреть файл

@ -0,0 +1,351 @@
const parse = require('parse-link-header');
const Request = require('./request');
class Processor {
constructor() {
}
collection(request) {
// if there are additional pages, queue them up to be processed. Note that these go
// on the high priority queue so they are loaded before they change much.
const linkHeader = request.response.headers.link;
if (linkHeader) {
const links = parse(linkHeader);
for (let i = 2; i <= links.last.page; i++) {
const url = request.url + `?page=${i}&per_page=100`;
const newRequest = Request.create('page', url);
newRequest.force = request.force;
newRequest.page = i;
newRequest.subType = request.subType;
newRequest.context = { qualifier: request.context.qualifier };
request.crawler.queue(request, newRequest, request.crawler.priorityQueue);
}
}
// Rewrite the request and document to be a 'page' and then process.
request.page = 1;
request.document._metadata.type = 'page';
return this.page(request);
}
page(request) {
const document = request.document;
const type = request.subType;
const first = document.elements[0];
const qualifier = request.context.qualifier;
request.linkSelf('self', `${qualifier}:${type}:pages:${request.page}`);
document.elements.forEach(item => {
request.queueChild(type, item.url, qualifier);
});
return document;
}
org(request) {
const document = request.document;
request.addSelfLink('urn:');
request.linkSiblings('repos', `urn:org:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:org');
request.queueChildren('repos', document.repos_url);
// TODO is this "logins"
request.queueChildren('users', document.members_url.replace('{/member}', ''));
return document;
}
user(request) {
const document = request.document;
request.addSelfLink('urn:');
request.linkSiblings('repos', `urn:user:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:user');
request.queueChildren('repos', document.repos_url);
return document;
}
repo(request) {
const document = request.document;
request.addSelfLink('urn:');
request.linkSelf('owner', `urn:login:${document.owner.id}`);
request.linkSelf('parent', `urn:login:${document.owner.id}`);
request.linkSiblings('siblings', `urn:login:${document.owner.id}:repos`);
request.queueRoot('login', document.owner.url);
request.queueChildren('issues', document.issues_url.replace('{/number}', ''), { repo: document.id });
request.queueChildren('commits', document.commits_url.replace('{/sha}', ''), { repo: document.id });
return document;
}
commit(request) {
const document = request.document;
const context = request.context;
request.addSelfLink(null, 'sha');
request.linkSelf('repo', `urn:repo:${context.repo}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:commits`);
// TODO not sure what the following line does
// document._metadata.links.parent = document._metadata.links.parent;
if (document.author) {
request.linkSelf('author', `urn:login:${document.author.id}`);
request.queueRoot('login', document.author.url);
}
if (document.committer) {
request.linkSelf('committer', `urn:login:${document.committer.id}`);
request.queueRoot('login', document.committer.url);
}
if (document.files) {
document.files.forEach(file => {
delete file.patch;
});
}
return document;
}
login(request) {
const document = request.document;
request.addSelfLink('urn:');
request.linkSelf('self', `urn:login:${document.id}`);
// TODO should we do repos here and in the user/org?
request.linkSiblings('repos', `urn:login:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:login');
if (document.type === 'Organization') {
request.queueRoot('org', `https://api.github.com/orgs/${document.login}`);
} else if (document.type === 'User') {
request.queueRoot('user', `https://api.github.com/users/${document.login}`);
}
request.queueChildren('repos', document.repos_url);
return document;
}
issue(request) {
const document = request.document;
const context = request.context;
request.addSelfLink(request);
request.linkSelf('assignees', document.assignees.map(assignee => { return `urn:login:${assignee.id}`; }));
request.linkSelf('repo', `urn:repo:${context.repo}`);
request.linkSelf('parent', `urn:repo:${context.repo}`);
request.linkSelf('user', `urn:login:${document.user.id}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:issues`);
request.queueRoot('login', document.user.url);
if (document.assignee) {
request.linkSelf('assignee', `urn:login:${document.assignee.id}`);
request.queueRoot('login', document.assignee.url);
}
if (document.closed_by) {
request.linkSelf('closed_by', `urn:login:${document.closed_by.id}`);
request.queueRoot('login', document.closed_by.url);
}
// milestone
// pull request
// events
// labels
request.queueChildren('issue_comments', document.comments_url, { issue: document.id, repo: context.repo });
return document;
}
issue_comment(request) {
const document = request.document;
const context = request.context;
request.addSelfLink(request);
request.linkSelf('user', `urn:login:${document.user.id}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:issue:${context.issue}:comments`);
request.queue('login', document.user.url);
return document;
}
team(request) {
const document = request.document;
request.addSelfLink(`urn:org:${document.organization.id}`);
request.linkSelf('org', `urn:org:${document.organization.id}`);
request.linkSelf('login', `urn:login:${document.organization.id}`);
request.linkSiblings('siblings', `urn:org:${document.organization.id}:teams`);
request.queueChildren('team_members', document.members_url);
request.queueChildren('team_repos', document.repositories_url);
return document;
}
team_members(request) {
const document = request.document;
request.addSelfLink(`urn:org:${document.organization.id}`);
return document;
}
team_repos(request) {
request.addSelfLink(`urn:org:${document.organization.id}`);
return document;
}
// =============== Event Processors ============
CommitCommentEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(request);
request.linkSelf('comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`);
// TODO siblings?
request.queue('comment', payload.comment.url);
return document;
}
CreateEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
return document;
}
DeleteEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
// TODO do something for interesting deletions e.g., where ref-type === 'repository'
return document;
}
DeploymentEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`);
request.queue('deployment', payload.deployment.url);
return document;
}
DeploymentStatusEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('deployment_status', `urn:repo:${context.repo}:deployment:${payload.deployment.id}:status:${payload.deployment_status.id}`);
request.linkSelf('deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`);
request.queue('deployment', payload.deployment.url);
return document;
}
ForkEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
// TODO figure out what else to do
return document;
}
GollumEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
return document;
}
IssueCommentEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`);
request.linkSelf('comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`);
request.queue('comment', payload.comment.url);
request.queue('issue', payload.issue.url);
return document;
}
IssuesEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`);
request.queue('issue', payload.issue.url);
return document;
}
LabelEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
return document;
}
MemberEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('member', `urn:login:${payload.member.id}`);
request.queueRoot('login', payload.member.url);
return document;
}
MembershipEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('self', `urn:team:${payload.team.id}:membership_event:${document.id}`);
request.linkSelf('member', `urn:login:${payload.member.id}`);
request.linkSelf('team', `urn:team:${payload.team.id}`);
request.linkSelf('org', `urn:org:${payload.organization.id}`);
request.queueRoot('login', payload.member.url);
request.queueRoot('org', payload.organization.url);
request.queue('team', payload.team.url);
return document;
}
MilestoneEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('milestone', `urn:repo:${context.repo}:milestone:${payload.milestone.id}`);
request.queue('milestone', payload.milestone.url);
return document;
}
PageBuildEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('page_build', `urn:repo:${context.repo}:page_builds:${payload.id}`);
request.queue('page_build', payload.build.url);
return document;
}
PublicEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
return document;
}
PullRequestEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`);
request.queue('pull', payload.pull_request.url);
return document;
}
PullRequestReviewEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('review', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:review:${payload.review.id}`);
request.linkSelf('pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`);
request.queue('pull_review', payload.pull_request.review_comment_url.replace('{/number}', `/${payload.review.id}`));
request.queue('pull', payload.pull_request.url);
return document;
}
PullRequestReviewCommentEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
request.linkSelf('comment', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:comment:${payload.comment.id}`);
request.linkSelf('pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`);
// TODO see if all the various comments can be the same type
request.queue('pull_comment', payload.comment.url);
request.queue('pull', payload.pull_request.url);
return document;
}
PushEvent(request) {
const document = request.document;
const context = request.context;
const payload = request.eventHelper(document);
// TODO figure out what to do with the commits
return document;
}
}
module.exports = Processor;

109
lib/request.js Normal file
Просмотреть файл

@ -0,0 +1,109 @@
const extend = require('extend');
class Request {
static create(type, url) {
const result = new Request();
result.type = type;
result.url = url;
return result;
}
addMeta(data) {
this.meta = extend({}, this.meta, data);
return this;
}
addSelfLink(base = null, key = 'id') {
let qualifier = base ? base : this.context.qualifier;
qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':';
this.linkSelf('self', `${qualifier}${this.type}:${this.document[key]}`);
}
linkSelf(name, value) {
const links = this.document._metadata.links;
const key = Array.isArray(value) ? 'hrefs' : 'href';
links[name] = { type: 'self' };
links[name][key] = value;
}
linkSiblings(name, href) {
const links = this.document._metadata.links;
links[name] = { type: 'siblings', href: href };
}
queue(type, url, context, queue = null) {
const newRequest = Request.create(type, url);
newRequest.context = context;
this.crawler.queue(this, newRequest, queue);
}
queueRoot(type, url) {
this.crawler.queue(this, Request.create(type, url));
}
queueChild(type, url, qualifier) {
const newRequest = Request.create(type, url);
newRequest.context = this.context || {};
newRequest.context.qualifier = qualifier;
if (this.force) {
newRequest.force = this.force;
}
this.crawler.queue(this, newRequest);
}
queueChildren(type, url, context = null) {
const newRequest = Request.create(type, url);
const newContext = extend(this.context || {}, context);
newRequest.context = newContext;
newContext.qualifier = this.document._metadata.links.self.href;
if (this.force) {
newRequest.force = this.force;
}
this.crawler.queue(this, newRequest);
}
markSkip(outcome, message) {
this.processControl = 'skip';
this.outcome = this.outcome || outcome;
this.message = this.message || message;
return this;
}
markRequeue(message) {
this.processControl = 'requeue';
this.message = this.message || message;
return this;
}
shouldSkip() {
return this.processControl;
}
shouldRequeue() {
return this.processControl === 'requeue';
}
eventHelper(references) {
const document = this.document;
// TODO understand if the actor is typically the same as the creator or pusher in the payload
const repo = document.repo ? document.repo.id : null;
const urn = repo ? `urn:repo:${repo}` : `urn:org:${document.org.id}`;
this.linkSelf('self', `${urn}:${this.type}:${document.id}`);
this.linkSelf('actor', `urn:login:${document.actor.id}`);
this.linkSelf('repo', `urn:repo:${document.repo.id}`);
this.linkSelf('org', `urn:org:${document.org.id}`);
this.queueRoot('login', document.actor.url);
this.queueRoot('repo', document.repo.url);
this.queueRoot('org', document.org.url);
return document.payload;
}
getCollectionType() {
const collections = {
orgs: 'org', repos: 'repo', issues: 'issue', issue_comments: 'issue_comment', commits: 'commit', teams: 'team', users: 'user'
};
return collections[this.type];
}
}
module.exports = Request;