From d36c5ab7e219e38304ebad9a64a8d24ab261d9b6 Mon Sep 17 00:00:00 2001 From: Jeff McAffer Date: Thu, 10 Nov 2016 23:59:42 -0800 Subject: [PATCH 1/2] Break crawler into three parts --- index.js | 2 + lib/crawler.js | 456 +++-------------------------------------------- lib/processor.js | 349 ++++++++++++++++++++++++++++++++++++ lib/request.js | 95 ++++++++++ 4 files changed, 466 insertions(+), 436 deletions(-) create mode 100644 lib/processor.js create mode 100644 lib/request.js diff --git a/index.js b/index.js index 92e1a81..c42e4e0 100644 --- a/index.js +++ b/index.js @@ -1,3 +1,5 @@ module.exports.eventFinder = require('./lib/eventFinder'); module.exports.webhookDriver = require('./lib/webhookDriver'); module.exports.crawler = require('./lib/crawler'); +module.exports.request = require('./lib/request'); +module.exports.processor = require('./lib/processor'); diff --git a/lib/crawler.js b/lib/crawler.js index 95656b7..99acc55 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -1,13 +1,10 @@ const extend = require('extend'); const moment = require('moment'); const parse = require('parse-link-header'); +const Processor = require('./processor'); const Q = require('q'); const URL = require('url'); -const collections = { - orgs: 'org', repos: 'repo', issues: 'issue', issue_comments: 'issue_comment', commits: 'commit', teams: 'team', users: 'user' -}; - class Crawler { constructor(queue, priorityQueue, store, requestor, config, logger) { this.queue = queue; @@ -16,6 +13,7 @@ class Crawler { this.requestor = requestor; this.config = config; this.logger = logger; + this.processor = new Processor(); } start() { @@ -36,7 +34,13 @@ class Crawler { } _pop(queue, request = null) { - return request ? Q(request) : queue.pop(); + const self = this; + return (request ? Q(request) : queue.pop()).then(result => { + if (result) { + result.crawler = self; + } + return result; + }); } _trackStart(request) { @@ -50,7 +54,7 @@ class Crawler { _filter(request) { if (this._configFilter(request.type, request.url)) { - this._markSkip(request, 'Filtered'); + request.markSkip('Filtered'); } return Q.resolve(request); } @@ -62,7 +66,7 @@ class Crawler { // rewrite the request type for collections remember the collection subType // Also setup 'page' as the document type to look up for etags etc. let fetchType = request.type; - let subType = collections[request.type]; + let subType = request.getCollectionType(); if (subType) { request.type = 'collection'; request.subType = subType; @@ -74,9 +78,9 @@ class Crawler { const start = Date.now(); return self.requestor.get(request.url, options).then(githubResponse => { const status = githubResponse.statusCode; - this._addMeta(request, { status: status, fetch: Date.now() - start }); + request.addMeta({ status: status, fetch: Date.now() - start }); if (status !== 200 && status !== 304) { - self._markSkip(request, 'Error', new Error(`Code: ${status} for: ${request.url}`)); + request.markSkip('Error', new Error(`Code: ${status} for: ${request.url}`)); return request; } @@ -84,7 +88,7 @@ class Crawler { // We have the content for this element. If it is immutable, skip. // Otherwise get it from the store and process. if (!request.force) { - return self._markSkip(request, 'Unmodified'); + return request.markSkip('Unmodified'); } return self.store.get(fetchType, request.url).then(document => { request.document = document; @@ -100,7 +104,7 @@ class Crawler { }); }).catch(error => { // TODO can this request be requeued? - return this._markSkip(request, 'Error', error); + return request.markSkip('Error', error); }); } @@ -128,13 +132,13 @@ class Crawler { if (request.skip) { return Q.resolve(request); } - const handler = this[request.type]; + let handler = this.processor[request.type]; if (!handler) { - this._markSkip(request, 'Warning', `No handler found for request type: ${request.type}`); + request.markSkip('Warning', `No handler found for request type: ${request.type}`); return request; } - request.document = handler.call(this, request); + request.document = handler.call(this.processor, request); return Q.resolve(request); } @@ -162,410 +166,16 @@ class Crawler { if (outcome === 'Error') { this.logger.log(outcome, request.message); } else { - this._addMeta(request, { total: Date.now() - request.start }); + request.addMeta({ total: Date.now() - request.start }); this.logger.log('info', `${outcome} ${request.type} [${request.url}] ${request.message || ''}`, request.meta); } return request; } - _addMeta(request, data) { - request.meta = extend({}, request.meta, data); - return request; - } - - // =============== Entity Processors ============ - - collection(request) { - // if there are additional pages, queue them up to be processed. Note that these go - // on the high priority queue so they are loaded before they change much. - const linkHeader = request.response.headers.link; - if (linkHeader) { - const links = parse(linkHeader); - for (let i = 2; i <= links.last.page; i++) { - const url = request.url + `?page=${i}&per_page=100`; - const context = { qualifier: request.context.qualifier }; - this._queueBase(request, { type: 'page', url: url, subType: request.subType, page: i, force: request.force, context: context }, this.priorityQueue); - } - } - - // Rewrite the request and document to be a 'page' and then process. - request.page = 1; - request.document._metadata.type = 'page'; - return this.page(request); - } - - page(request) { - const document = request.document; - const type = request.subType; - const first = document.elements[0]; - const qualifier = request.context.qualifier; - this._linkSelf(request, 'self', `${qualifier}:${type}:pages:${request.page}`); - document.elements.forEach(item => { - this._queueChild(request, type, item.url, qualifier); - }); - return document; - } - - org(request) { - const document = request.document; - this._addSelfLink(request, 'urn:'); - this._linkSiblings(request, 'repos', `urn:org:${document.id}:repos`); - this._linkSiblings(request, 'siblings', 'urn:org'); - this._queueChildren(request, 'repos', document.repos_url); - // TODO is this "logins" - this._queueChildren(request, 'users', document.members_url.replace('{/member}', '')); - return document; - } - - user(request) { - const document = request.document; - this._addSelfLink(request, 'urn:'); - this._linkSiblings(request, 'repos', `urn:user:${document.id}:repos`); - this._linkSiblings(request, 'siblings', 'urn:user'); - this._queueChildren(request, 'repos', document.repos_url); - return document; - } - - repo(request) { - const document = request.document; - this._addSelfLink(request, 'urn:'); - this._linkSelf(request, 'owner', `urn:login:${document.owner.id}`); - this._linkSelf(request, 'parent', `urn:login:${document.owner.id}`); - this._linkSiblings(request, 'siblings', `urn:login:${document.owner.id}:repos`); - this._queueRoot(request, 'login', document.owner.url); - this._queueChildren(request, 'issues', document.issues_url.replace('{/number}', ''), { repo: document.id }); - this._queueChildren(request, 'commits', document.commits_url.replace('{/sha}', ''), { repo: document.id }); - return document; - } - - commit(request) { - const document = request.document; - const context = request.context; - this._addSelfLink(request, null, 'sha'); - - this._linkSelf(request, 'repo', `urn:repo:${context.repo}`); - this._linkSiblings(request, 'siblings', `urn:repo:${context.repo}:commits`); - // TODO not sure what the following line does - // document._metadata.links.parent = document._metadata.links.parent; - if (document.author) { - this._linkSelf(request, 'author', `urn:login:${document.author.id}`); - this._queueRoot(request, 'login', document.author.url); - } - if (document.committer) { - this._linkSelf(request, 'committer', `urn:login:${document.committer.id}`); - this._queueRoot(request, 'login', document.committer.url); - } - if (document.files) { - document.files.forEach(file => { - delete file.patch; - }); - } - return document; - } - - login(request) { - const document = request.document; - this._addSelfLink(request, 'urn:'); - this._linkSelf(request, 'self', `urn:login:${document.id}`); - // TODO should we do repos here and in the user/org? - this._linkSiblings(request, 'repos', `urn:login:${document.id}:repos`); - this._linkSiblings(request, 'siblings', 'urn:login'); - if (document.type === 'Organization') { - this._queueRoot(request, 'org', `https://api.github.com/orgs/${document.login}`); - } else if (document.type === 'User') { - this._queueRoot(request, 'user', `https://api.github.com/users/${document.login}`); - } - this._queueChildren(request, 'repos', document.repos_url); - return document; - } - - issue(request) { - const document = request.document; - const context = request.context; - this._addSelfLink(request); - this._linkSelf(request, 'assignees', document.assignees.map(assignee => { return `urn:login:${assignee.id}`; })); - this._linkSelf(request, 'repo', `urn:repo:${context.repo}`); - this._linkSelf(request, 'parent', `urn:repo:${context.repo}`); - this._linkSelf(request, 'user', `urn:login:${document.user.id}`); - this._linkSiblings(request, 'siblings', `urn:repo:${context.repo}:issues`); - this._queueRoot(request, 'login', document.user.url); - if (document.assignee) { - this._linkSelf(request, 'assignee', `urn:login:${document.assignee.id}`); - this._queueRoot(request, 'login', document.assignee.url); - } - if (document.closed_by) { - this._linkSelf(request, 'closed_by', `urn:login:${document.closed_by.id}`); - this._queueRoot(request, 'login', document.closed_by.url); - } - - // milestone - // pull request - // events - // labels - this._queueChildren(request, 'issue_comments', document.comments_url, { issue: document.id, repo: context.repo }); - return document; - } - - issue_comment(request) { - const document = request.document; - const context = request.context; - this._addSelfLink(request); - this._linkSelf(request, 'user', `urn:login:${document.user.id}`); - this._linkSiblings(request, 'siblings', `urn:repo:${context.repo}:issue:${context.issue}:comments`); - this._queue(request, 'login', document.user.url); - return document; - } - - team(request) { - const document = request.document; - this._addSelfLink(request, `urn:org:${document.organization.id}`); - this._linkSelf(request, 'org', `urn:org:${document.organization.id}`); - this._linkSelf(request, 'login', `urn:login:${document.organization.id}`); - this._linkSiblings(request, 'siblings', `urn:org:${document.organization.id}:teams`); - this._queueChildren(request, 'team_members', document.members_url); - this._queueChildren(request, 'team_repos', document.repositories_url); - return document; - } - - team_members(request) { - const document = request.document; - this._addSelfLink(request, `urn:org:${document.organization.id}`); - return document; - } - - team_repos(request) { - this._addSelfLink(request, `urn:org:${document.organization.id}`); - return document; - } - - // =============== Event Processors ============ - CommitCommentEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(request); - this._linkSelf(request, 'comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`); - // TODO siblings? - this._queue(request, 'comment', payload.comment.url); - return document; - } - - CreateEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - return document; - } - - DeleteEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - // TODO do something for interesting deletions e.g., where ref-type === 'repository' - return document; - } - - DeploymentEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`); - this._queue(request, 'deployment', payload.deployment.url); - return document; - } - - DeploymentStatusEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'deployment_status', `urn:repo:${context.repo}:deployment:${payload.deployment.id}:status:${payload.deployment_status.id}`); - this._linkSelf(request, 'deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`); - this._queue(request, 'deployment', payload.deployment.url); - return document; - } - - ForkEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - // TODO figure out what else to do - return document; - } - - GollumEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - return document; - } - - IssueCommentEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`); - this._linkSelf(request, 'comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`); - this._queue(request, 'comment', payload.comment.url); - this._queue(request, 'issue', payload.issue.url); - return document; - } - - IssuesEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`); - this._queue(request, 'issue', payload.issue.url); - return document; - } - - LabelEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - return document; - } - - MemberEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'member', `urn:login:${payload.member.id}`); - this._queueRoot(request, 'login', payload.member.url); - return document; - } - - MembershipEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'self', `urn:team:${payload.team.id}:membership_event:${document.id}`); - this._linkSelf(request, 'member', `urn:login:${payload.member.id}`); - this._linkSelf(request, 'team', `urn:team:${payload.team.id}`); - this._linkSelf(request, 'org', `urn:org:${payload.organization.id}`); - this._queueRoot(request, 'login', payload.member.url); - this._queueRoot(request, 'org', payload.organization.url); - this._queue(request, 'team', payload.team.url); - return document; - } - - MilestoneEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'milestone', `urn:repo:${context.repo}:milestone:${payload.milestone.id}`); - this._queue(request, 'milestone', payload.milestone.url); - return document; - } - - PageBuildEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'page_build', `urn:repo:${context.repo}:page_builds:${payload.id}`); - this._queue(request, 'page_build', payload.build.url); - return document; - } - - PublicEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - return document; - } - - PullRequestEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`); - this._queue(request, 'pull', payload.pull_request.url); - return document; - } - - PullRequestReviewEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'review', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:review:${payload.review.id}`); - this._linkSelf(request, 'pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`); - this._queue(request, 'pull_review', payload.pull_request.review_comment_url.replace('{/number}', `/${payload.review.id}`)); - this._queue(request, 'pull', payload.pull_request.url); - return document; - } - - PullRequestReviewCommentEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - this._linkSelf(request, 'comment', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:comment:${payload.comment.id}`); - this._linkSelf(request, 'pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`); - // TODO see if all the various comments can be the same type - this._queue(request, 'pull_comment', payload.comment.url); - this._queue(request, 'pull', payload.pull_request.url); - return document; - } - - PushEvent(request) { - const document = request.document; - const context = request.context; - const payload = this._eventHelper(document); - // TODO figure out what to do with the commits - return document; - } - // =============== Helpers ============ - _addSelfLink(request, base = null, key = 'id') { - let qualifier = base ? base : request.context.qualifier; - qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':'; - this._linkSelf(request, 'self', `${qualifier}${request.type}:${request.document[key]}`); - } - - _linkSelf(request, name, value) { - const links = request.document._metadata.links; - const key = Array.isArray(value) ? 'hrefs' : 'href'; - links[name] = { type: 'self' }; - links[name][key] = value; - } - - _linkSiblings(request, name, href) { - const links = request.document._metadata.links; - links[name] = { type: 'siblings', href: href }; - } - - _queue(request, type, url, context, queue = null) { - const newRequest = { type: type, url: url }; - newRequest.context = context; - this._queueBase(request, newRequest, queue); - } - - _queueRoot(request, type, url) { - this._queueBase(request, { type: type, url: url }); - } - - _queueChild(request, type, url, qualifier) { - const newRequest = { type: type, url: url }; - newRequest.context = request.context || {}; - newRequest.context.qualifier = qualifier; - if (request.force) { - newRequest.force = request.force; - } - this._queueBase(request, newRequest); - } - - _queueChildren(request, type, url, context = null) { - const newRequest = { type: type, url: url }; - const newContext = extend(request.context || {}, context); - newRequest.context = newContext; - newContext.qualifier = request.document._metadata.links.self.href; - if (request.force) { - newRequest.force = request.force; - } - this._queueBase(request, newRequest); - } - // TODO make a queue all and add promises (then) to the code below - _queueBase(request, newRequest, queue = null) { + queueBase(request, newRequest, queue = null) { if (this._configFilter(newRequest.type, newRequest.url)) { this.logger.log('info', `Skipped queuing ${newRequest.type} [${newRequest.url}]`); return; @@ -585,32 +195,6 @@ class Crawler { } return false; } - - _markSkip(request, outcome, message) { - request.skip = true; - request.outcome = request.outcome || outcome; - request.message = request.message || message; - return request; - } - - _eventHelper(request, references) { - const document = request.document; - // TODO understand if the actor is typically the same as the creator or pusher in the payload - const repo = document.repo ? document.repo.id : null; - const urn = repo ? `urn:repo:${repo}` : `urn:org:${document.org.id}`; - this._linkSelf(request, 'self', `${urn}:${request.type}:${document.id}`); - this._linkSelf(request, 'actor', `urn:login:${document.actor.id}`); - this._linkSelf(request, 'repo', `urn:repo:${document.repo.id}`); - this._linkSelf(request, 'org', `urn:org:${document.org.id}`); - this._queueRoot(request, 'login', document.actor.url); - this._queueRoot(request, 'repo', document.repo.url); - this._queueRoot(request, 'org', document.org.url); - return document.payload; - } - - _isCollectionRequest(request) { - return collections.hasOwnProperty(request.type); - } } module.exports = Crawler; \ No newline at end of file diff --git a/lib/processor.js b/lib/processor.js new file mode 100644 index 0000000..eacd888 --- /dev/null +++ b/lib/processor.js @@ -0,0 +1,349 @@ + +class Processor { + constructor() { + } + + collection(request) { + // if there are additional pages, queue them up to be processed. Note that these go + // on the high priority queue so they are loaded before they change much. + const linkHeader = request.response.headers.link; + if (linkHeader) { + const links = parse(linkHeader); + for (let i = 2; i <= links.last.page; i++) { + const url = request.url + `?page=${i}&per_page=100`; + const newRequest = Request.create('page', url); + newRequest.force = request.force; + newRequest.page = i; + newRequest.subType = request.subType; + newRequest.context = { qualifier: request.context.qualifier }; + request.crawler.queueBase(request, newRequest, request.crawler.priorityQueue); + } + } + + // Rewrite the request and document to be a 'page' and then process. + request.page = 1; + request.document._metadata.type = 'page'; + return this.page(request); + } + + page(request) { + const document = request.document; + const type = request.subType; + const first = document.elements[0]; + const qualifier = request.context.qualifier; + request.linkSelf('self', `${qualifier}:${type}:pages:${request.page}`); + document.elements.forEach(item => { + request.queueChild(type, item.url, qualifier); + }); + return document; + } + + org(request) { + const document = request.document; + request.addSelfLink('urn:'); + request.linkSiblings('repos', `urn:org:${document.id}:repos`); + request.linkSiblings('siblings', 'urn:org'); + request.queueChildren('repos', document.repos_url); + // TODO is this "logins" + request.queueChildren('users', document.members_url.replace('{/member}', '')); + return document; + } + + user(request) { + const document = request.document; + request.addSelfLink('urn:'); + request.linkSiblings('repos', `urn:user:${document.id}:repos`); + request.linkSiblings('siblings', 'urn:user'); + request.queueChildren('repos', document.repos_url); + return document; + } + + repo(request) { + const document = request.document; + request.addSelfLink('urn:'); + request.linkSelf('owner', `urn:login:${document.owner.id}`); + request.linkSelf('parent', `urn:login:${document.owner.id}`); + request.linkSiblings('siblings', `urn:login:${document.owner.id}:repos`); + request.queueRoot('login', document.owner.url); + request.queueChildren('issues', document.issues_url.replace('{/number}', ''), { repo: document.id }); + request.queueChildren('commits', document.commits_url.replace('{/sha}', ''), { repo: document.id }); + return document; + } + + commit(request) { + const document = request.document; + const context = request.context; + request.addSelfLink(null, 'sha'); + + request.linkSelf('repo', `urn:repo:${context.repo}`); + request.linkSiblings('siblings', `urn:repo:${context.repo}:commits`); + // TODO not sure what the following line does + // document._metadata.links.parent = document._metadata.links.parent; + if (document.author) { + request.linkSelf('author', `urn:login:${document.author.id}`); + request.queueRoot('login', document.author.url); + } + if (document.committer) { + request.linkSelf('committer', `urn:login:${document.committer.id}`); + request.queueRoot('login', document.committer.url); + } + if (document.files) { + document.files.forEach(file => { + delete file.patch; + }); + } + return document; + } + + login(request) { + const document = request.document; + request.addSelfLink('urn:'); + request.linkSelf('self', `urn:login:${document.id}`); + // TODO should we do repos here and in the user/org? + request.linkSiblings('repos', `urn:login:${document.id}:repos`); + request.linkSiblings('siblings', 'urn:login'); + if (document.type === 'Organization') { + request.queueRoot('org', `https://api.github.com/orgs/${document.login}`); + } else if (document.type === 'User') { + request.queueRoot('user', `https://api.github.com/users/${document.login}`); + } + request.queueChildren('repos', document.repos_url); + return document; + } + + issue(request) { + const document = request.document; + const context = request.context; + request.addSelfLink(request); + request.linkSelf('assignees', document.assignees.map(assignee => { return `urn:login:${assignee.id}`; })); + request.linkSelf('repo', `urn:repo:${context.repo}`); + request.linkSelf('parent', `urn:repo:${context.repo}`); + request.linkSelf('user', `urn:login:${document.user.id}`); + request.linkSiblings('siblings', `urn:repo:${context.repo}:issues`); + request.queueRoot('login', document.user.url); + if (document.assignee) { + request.linkSelf('assignee', `urn:login:${document.assignee.id}`); + request.queueRoot('login', document.assignee.url); + } + if (document.closed_by) { + request.linkSelf('closed_by', `urn:login:${document.closed_by.id}`); + request.queueRoot('login', document.closed_by.url); + } + + // milestone + // pull request + // events + // labels + request.queueChildren('issue_comments', document.comments_url, { issue: document.id, repo: context.repo }); + return document; + } + + issue_comment(request) { + const document = request.document; + const context = request.context; + request.addSelfLink(request); + request.linkSelf('user', `urn:login:${document.user.id}`); + request.linkSiblings('siblings', `urn:repo:${context.repo}:issue:${context.issue}:comments`); + request.queue('login', document.user.url); + return document; + } + + team(request) { + const document = request.document; + request.addSelfLink(`urn:org:${document.organization.id}`); + request.linkSelf('org', `urn:org:${document.organization.id}`); + request.linkSelf('login', `urn:login:${document.organization.id}`); + request.linkSiblings('siblings', `urn:org:${document.organization.id}:teams`); + request.queueChildren('team_members', document.members_url); + request.queueChildren('team_repos', document.repositories_url); + return document; + } + + team_members(request) { + const document = request.document; + request.addSelfLink(`urn:org:${document.organization.id}`); + return document; + } + + team_repos(request) { + request.addSelfLink(`urn:org:${document.organization.id}`); + return document; + } + + // =============== Event Processors ============ + CommitCommentEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(request); + request.linkSelf('comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`); + // TODO siblings? + request.queue('comment', payload.comment.url); + return document; + } + + CreateEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + return document; + } + + DeleteEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + // TODO do something for interesting deletions e.g., where ref-type === 'repository' + return document; + } + + DeploymentEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`); + request.queue('deployment', payload.deployment.url); + return document; + } + + DeploymentStatusEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('deployment_status', `urn:repo:${context.repo}:deployment:${payload.deployment.id}:status:${payload.deployment_status.id}`); + request.linkSelf('deployment', `urn:repo:${context.repo}:deployment:${payload.deployment.id}`); + request.queue('deployment', payload.deployment.url); + return document; + } + + ForkEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + // TODO figure out what else to do + return document; + } + + GollumEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + return document; + } + + IssueCommentEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`); + request.linkSelf('comment', `urn:repo:${context.repo}:comment:${payload.comment.id}`); + request.queue('comment', payload.comment.url); + request.queue('issue', payload.issue.url); + return document; + } + + IssuesEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('issue', `urn:repo:${context.repo}:issue:${payload.issue.id}`); + request.queue('issue', payload.issue.url); + return document; + } + + LabelEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + return document; + } + + MemberEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('member', `urn:login:${payload.member.id}`); + request.queueRoot('login', payload.member.url); + return document; + } + + MembershipEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('self', `urn:team:${payload.team.id}:membership_event:${document.id}`); + request.linkSelf('member', `urn:login:${payload.member.id}`); + request.linkSelf('team', `urn:team:${payload.team.id}`); + request.linkSelf('org', `urn:org:${payload.organization.id}`); + request.queueRoot('login', payload.member.url); + request.queueRoot('org', payload.organization.url); + request.queue('team', payload.team.url); + return document; + } + + MilestoneEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('milestone', `urn:repo:${context.repo}:milestone:${payload.milestone.id}`); + request.queue('milestone', payload.milestone.url); + return document; + } + + PageBuildEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('page_build', `urn:repo:${context.repo}:page_builds:${payload.id}`); + request.queue('page_build', payload.build.url); + return document; + } + + PublicEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + return document; + } + + PullRequestEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`); + request.queue('pull', payload.pull_request.url); + return document; + } + + PullRequestReviewEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('review', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:review:${payload.review.id}`); + request.linkSelf('pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`); + request.queue('pull_review', payload.pull_request.review_comment_url.replace('{/number}', `/${payload.review.id}`)); + request.queue('pull', payload.pull_request.url); + return document; + } + + PullRequestReviewCommentEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + request.linkSelf('comment', `urn:repo:${context.repo}:pull:${payload.pull_request.id}:comment:${payload.comment.id}`); + request.linkSelf('pull', `urn:repo:${context.repo}:pull:${payload.pull_request.id}`); + // TODO see if all the various comments can be the same type + request.queue('pull_comment', payload.comment.url); + request.queue('pull', payload.pull_request.url); + return document; + } + + PushEvent(request) { + const document = request.document; + const context = request.context; + const payload = request.eventHelper(document); + // TODO figure out what to do with the commits + return document; + } +} + +module.exports = Processor; \ No newline at end of file diff --git a/lib/request.js b/lib/request.js new file mode 100644 index 0000000..a0b3d5d --- /dev/null +++ b/lib/request.js @@ -0,0 +1,95 @@ +const extend = require('extend'); + +class Request { + static create(type, url) { + const result = new Request(); + result.type = type; + result.url = url; + return result; + } + + addMeta(data) { + this.meta = extend({}, this.meta, data); + return this; + } + + addSelfLink(base = null, key = 'id') { + let qualifier = base ? base : this.context.qualifier; + qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':'; + this.linkSelf('self', `${qualifier}${this.type}:${this.document[key]}`); + } + + linkSelf(name, value) { + const links = this.document._metadata.links; + const key = Array.isArray(value) ? 'hrefs' : 'href'; + links[name] = { type: 'self' }; + links[name][key] = value; + } + + linkSiblings(name, href) { + const links = this.document._metadata.links; + links[name] = { type: 'siblings', href: href }; + } + + queue(type, url, context, queue = null) { + const newRequest = Request.create(type, url); + newRequest.context = context; + this.crawler.queueBase(this, newRequest, queue); + } + + queueRoot(type, url) { + this.crawler.queueBase(this, Request.create(type, url)); + } + + queueChild(type, url, qualifier) { + const newRequest = Request.create(type, url); + newRequest.context = this.context || {}; + newRequest.context.qualifier = qualifier; + if (this.force) { + newRequest.force = this.force; + } + this.crawler.queueBase(this, newRequest); + } + + queueChildren(type, url, context = null) { + const newRequest = Request.create(type, url); + const newContext = extend(this.context || {}, context); + newRequest.context = newContext; + newContext.qualifier = this.document._metadata.links.self.href; + if (this.force) { + newRequest.force = this.force; + } + this.crawler.queueBase(this, newRequest); + } + + markSkip(outcome, message) { + this.skip = true; + this.outcome = this.outcome || outcome; + this.message = this.message || message; + return this; + } + + eventHelper(references) { + const document = this.document; + // TODO understand if the actor is typically the same as the creator or pusher in the payload + const repo = document.repo ? document.repo.id : null; + const urn = repo ? `urn:repo:${repo}` : `urn:org:${document.org.id}`; + this.linkSelf('self', `${urn}:${this.type}:${document.id}`); + this.linkSelf('actor', `urn:login:${document.actor.id}`); + this.linkSelf('repo', `urn:repo:${document.repo.id}`); + this.linkSelf('org', `urn:org:${document.org.id}`); + this.queueRoot('login', document.actor.url); + this.queueRoot('repo', document.repo.url); + this.queueRoot('org', document.org.url); + return document.payload; + } + + getCollectionType() { + const collections = { + orgs: 'org', repos: 'repo', issues: 'issue', issue_comments: 'issue_comment', commits: 'commit', teams: 'team', users: 'user' + }; + return collections[this.type]; + } +} + +module.exports = Request; \ No newline at end of file From b5df9f166b3b98991d81c163617d8d64c8955129 Mon Sep 17 00:00:00 2001 From: Jeff McAffer Date: Fri, 11 Nov 2016 10:55:30 -0800 Subject: [PATCH 2/2] add error and requeue handling --- lib/crawler.js | 81 +++++++++++++++++++++++++++++++++++------------- lib/processor.js | 4 ++- lib/request.js | 24 +++++++++++--- 3 files changed, 81 insertions(+), 28 deletions(-) diff --git a/lib/crawler.js b/lib/crawler.js index 99acc55..44ece3d 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -1,13 +1,12 @@ const extend = require('extend'); const moment = require('moment'); -const parse = require('parse-link-header'); const Processor = require('./processor'); const Q = require('q'); const URL = require('url'); class Crawler { constructor(queue, priorityQueue, store, requestor, config, logger) { - this.queue = queue; + this.normalQueue = queue; this.priorityQueue = priorityQueue; this.store = store; this.requestor = requestor; @@ -17,37 +16,71 @@ class Crawler { } start() { - return this._pop(this.priorityQueue) - .then(this._pop.bind(this, this.queue)) - .then(this._trackStart.bind(this)) + let requestBox = []; + + return this._getRequest(requestBox) .then(this._filter.bind(this)) .then(this._fetch.bind(this)) .then(this._convertToDocument.bind(this)) .then(this._processDocument.bind(this)) .then(this._storeDocument.bind(this)) - .then(this._deleteFromQueue.bind(this)) - .then(this._logOutcome.bind(this)) + .catch(this._requeueHandler.bind(this, requestBox)) + .then(this._completeRequest.bind(this)) .then(this._startNext.bind(this)) .catch(error => { - this.logger.log('error', `${error.message}`); + this.logger.log('Error', error); }); } + _requeueHandler(requestBox, error) { + return requestBox[0].markRequeue(error); + } + + _getRequest(requestBox) { + return this._pop(this.priorityQueue) + .then(this._pop.bind(this, this.normalQueue)) + .then(request => { + if (!request) { + request = new request('wait', null); + request.delay = 1000; + request.markSkip('Exhausted queue', `Waiting ${request.delay}ms`); + } + request.start = Date.now(); + requestBox[0] = request; + return request; + }); + } + + _completeRequest(request) { + if (request.shouldRequeue()) { + this.logger.log('Info', `Requeuing request ${request.type} for ${request.url}`); + this.queue(request, request); + } + return this._deleteFromQueue(request) + .then(this._wait.bind(this)) + .then(this._logOutcome.bind(this)); + } + + _wait(request) { + if (!request.delay) { + return Q(request); + } + const result = Q.defer(); + setTimeout(() => { result.resolve(request); }, request.delay); + return result; + } + _pop(queue, request = null) { const self = this; return (request ? Q(request) : queue.pop()).then(result => { if (result) { result.crawler = self; + result.originQueue = queue; } return result; }); } - _trackStart(request) { - request.start = Date.now(); - return Q(request); - } - _startNext() { setTimeout(this.start.bind(this), 0); } @@ -60,7 +93,7 @@ class Crawler { } _fetch(request) { - if (request.skip) { + if (request.shouldSkip()) { return Q.resolve(request); } // rewrite the request type for collections remember the collection subType @@ -80,8 +113,10 @@ class Crawler { const status = githubResponse.statusCode; request.addMeta({ status: status, fetch: Date.now() - start }); if (status !== 200 && status !== 304) { - request.markSkip('Error', new Error(`Code: ${status} for: ${request.url}`)); - return request; + if (status === 409) { + return request.markSkip('Error', new Error(`Code: ${status} for: ${request.url}`)); + } + return request.markRequeue(); } if (status === 304 && githubResponse.headers.etag === etag) { @@ -109,7 +144,7 @@ class Crawler { } _convertToDocument(request) { - if (request.skip) { + if (request.shouldSkip()) { return Q.resolve(request); } @@ -129,10 +164,11 @@ class Crawler { } _processDocument(request) { - if (request.skip) { + if (request.shouldSkip()) { return Q.resolve(request); } let handler = this.processor[request.type]; + handler = handler || this[request.type]; if (!handler) { request.markSkip('Warning', `No handler found for request type: ${request.type}`); return request; @@ -144,7 +180,7 @@ class Crawler { _storeDocument(request) { // See if we should skip storing the document. Test request.store explicitly for false as it may just not be set. - if (request.skip || !this.store || !request.document || request.store === false) { + if (request.shouldSkip() || !this.store || !request.document || request.store === false) { return Q.resolve(request); } @@ -158,7 +194,7 @@ class Crawler { if (!request.message) { return Q.resolve(request); } - return this.queue.done(request).then(() => { return request; }); + return this.normalQueue.done(request).then(() => { return request; }); } _logOutcome(request) { @@ -175,13 +211,14 @@ class Crawler { // =============== Helpers ============ // TODO make a queue all and add promises (then) to the code below - queueBase(request, newRequest, queue = null) { + queue(request, newRequest, queue = null) { if (this._configFilter(newRequest.type, newRequest.url)) { this.logger.log('info', `Skipped queuing ${newRequest.type} [${newRequest.url}]`); return; } - queue = queue || this.queue; + queue = queue || this.normalQueue; request.promises.push(queue.push(newRequest)); + return request; } _configFilter(type, target) { diff --git a/lib/processor.js b/lib/processor.js index eacd888..df930d2 100644 --- a/lib/processor.js +++ b/lib/processor.js @@ -1,3 +1,5 @@ +const parse = require('parse-link-header'); +const Request = require('./request'); class Processor { constructor() { @@ -16,7 +18,7 @@ class Processor { newRequest.page = i; newRequest.subType = request.subType; newRequest.context = { qualifier: request.context.qualifier }; - request.crawler.queueBase(request, newRequest, request.crawler.priorityQueue); + request.crawler.queue(request, newRequest, request.crawler.priorityQueue); } } diff --git a/lib/request.js b/lib/request.js index a0b3d5d..9d560fa 100644 --- a/lib/request.js +++ b/lib/request.js @@ -34,11 +34,11 @@ class Request { queue(type, url, context, queue = null) { const newRequest = Request.create(type, url); newRequest.context = context; - this.crawler.queueBase(this, newRequest, queue); + this.crawler.queue(this, newRequest, queue); } queueRoot(type, url) { - this.crawler.queueBase(this, Request.create(type, url)); + this.crawler.queue(this, Request.create(type, url)); } queueChild(type, url, qualifier) { @@ -48,7 +48,7 @@ class Request { if (this.force) { newRequest.force = this.force; } - this.crawler.queueBase(this, newRequest); + this.crawler.queue(this, newRequest); } queueChildren(type, url, context = null) { @@ -59,16 +59,30 @@ class Request { if (this.force) { newRequest.force = this.force; } - this.crawler.queueBase(this, newRequest); + this.crawler.queue(this, newRequest); } markSkip(outcome, message) { - this.skip = true; + this.processControl = 'skip'; this.outcome = this.outcome || outcome; this.message = this.message || message; return this; } + markRequeue(message) { + this.processControl = 'requeue'; + this.message = this.message || message; + return this; + } + + shouldSkip() { + return this.processControl; + } + + shouldRequeue() { + return this.processControl === 'requeue'; + } + eventHelper(references) { const document = this.document; // TODO understand if the actor is typically the same as the creator or pusher in the payload