Merge pull request #9 from Microsoft/jm/executionFixes

object walking and queuing fixes
This commit is contained in:
Jeff McAffer 2016-11-14 14:23:11 -08:00 коммит произвёл GitHub
Родитель 5ed3e0d868 52d247246b
Коммит 4deb1a0e92
9 изменённых файлов: 1407 добавлений и 138 удалений

1
.gitignore поставляемый
Просмотреть файл

@ -1,3 +1,4 @@
node_modules/
typings/
npm-debug.log
coverage/

25
.vscode/launch.json поставляемый
Просмотреть файл

@ -2,13 +2,34 @@
"version": "0.2.0",
"configurations": [
{
"name": "Run mocha",
"name": "Mocha",
"type": "node",
"request": "launch",
"program": "${workspaceRoot}/node_modules/mocha/bin/_mocha",
"stopOnEntry": false,
"args": [
"test/*.js"
"${workspaceRoot}/test/*.js"
],
"cwd": "${workspaceRoot}",
"runtimeExecutable": null,
"runtimeArgs": [
"--nolazy"
],
"env": {
"NODE_ENV": "development"
},
"console": "internalConsole"
},
{
"name": "Coverage",
"type": "node",
"request": "launch",
"program": "${workspaceRoot}/node_modules/istanbul/lib/cli.js",
"stopOnEntry": false,
"args": [
"cover",
"${workspaceRoot}/node_modules/mocha/bin/_mocha",
"${workspaceRoot}/test/*.js"
],
"cwd": "${workspaceRoot}",
"runtimeExecutable": null,

Просмотреть файл

@ -2,6 +2,7 @@ const extend = require('extend');
const moment = require('moment');
const Processor = require('./processor');
const Q = require('q');
const Request = require('./request');
const URL = require('url');
class Crawler {
@ -17,10 +18,11 @@ class Crawler {
this.processor = new Processor();
}
start() {
start(name) {
let requestBox = [];
return this._getRequest(requestBox)
return Q()
.then(this._getRequest.bind(this, requestBox, name))
.then(this._filter.bind(this))
.then(this._fetch.bind(this))
.then(this._convertToDocument.bind(this))
@ -30,96 +32,125 @@ class Crawler {
.then(this._completeRequest.bind(this), this._completeRequest.bind(this))
.catch(this._errorHandler.bind(this, requestBox))
.then(this._logOutcome.bind(this))
.then(this._startNext.bind(this), this._startNext.bind(this));
.then(this._startNext.bind(this, name), this._startNext.bind(this, name));
}
_errorHandler(requestBox, error) {
if (requestBox[0]) {
return requestBox[0].markRequeue('Error', error);
}
const request = new request('wait', null);
const request = new Request('_errorTrap', null);
request.markDelay();
request.markSkip('Error', error);
requestBox[0] = request;
return request;
}
_getRequest(requestBox) {
return this._pop(this.priorityQueue)
.then(this._pop.bind(this, this.normalQueue))
.then(request => {
if (!request) {
request = new request('wait', null);
request.markDelay();
request.markSkip('Exhausted queue', `Waiting 1 second`);
}
request.start = Date.now();
requestBox[0] = request;
return request;
})
.then(this._acquireLock.bind(this));
}
_acquireLock(request) {
if (request.url) {
return this.locker.lock(request.url, 5 * 60 * 1000).then((lock) => {
request.lock = lock;
return request;
}, error => {
return request.originQueue.abandon(request).finally(() => {
throw error;
});
});
}
}
_releaseLock(request) {
if (request.lock) {
return this.locker.unlock(request.lock)
.then(() => {
return request;
}, error => {
this.logger.error(error);
_getRequest(requestBox, name) {
const self = this;
return Q.try(() => {
return this._pop(this.priorityQueue)
.then(this._pop.bind(this, this.normalQueue))
.then(request => {
if (!request) {
request = new Request('_blank', null);
request.markDelay();
request.markSkip('Exhausted queue', `Waiting 1 second`);
}
request.start = Date.now();
request.crawler = self;
request.crawlerName = name;
requestBox[0] = request;
request.promises = [];
return request;
});
}
return Q(request);
}
_completeRequest(request) {
return this._releaseLock(request).finally(() => {
if (request.shouldRequeue()) {
request.attemptCount = request.attemptCount || 1;
if (++request.attemptCount > 5) {
this.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`);
this.queue(request, request, this.deadletterQueue);
} else {
request.addMeta({ attempt: request.attemptCount });
this.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`);
this.queue(request, request, request.originQueue);
}
}
return this._deleteFromQueue(request);
})
.then(this._acquireLock.bind(this));
}
_pop(queue, request = null) {
const self = this;
return (request ? Q(request) : queue.pop()).then(result => {
if (result) {
result.crawler = self;
return Q.try(() => {
return request ? request : queue.pop();
}).then(result => {
if (result && !result.originQueue) {
result.originQueue = queue;
}
return result;
});
}
_startNext(request) {
_acquireLock(request) {
if (!request.url || !this.locker) {
return Q(request);
}
const self = this;
return Q.try(() => {
return self.locker.lock(request.url, 5 * 60 * 1000);
}).then(
lock => {
request.lock = lock;
return request;
},
error => {
// If we could not acquire a lock, abandon the request so it will be returned to the queue.
// If that fails, throw the original error
return Q.try(() => {
request.originQueue.abandon(request);
}).finally(() => { throw error; });
});
}
_releaseLock(request) {
if (!request.url || !this.locker) {
return Q(request);
}
const self = this;
return Q.try(() => {
return this.locker.unlock(request.lock);
}).then(
() => {
request.lock = null;
return request;
},
error => {
request.lock = null;
self.logger.error(error);
return request;
});
}
_completeRequest(request) {
// requeue the request if needed then wait for any accumulated promises and release the lock and clean up the queue
this._requeue(request);
const self = this;
return Q.all(request.promises)
.finally(() => self._releaseLock(request))
.finally(() => self._deleteFromQueue(request))
.then(() => request);
}
_requeue(request) {
if (!request.shouldRequeue() || !request.url) {
return;
}
request.attemptCount = request.attemptCount || 0;
if (++request.attemptCount > 5) {
this.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`);
this.queue(request, request, this.deadletterQueue);
} else {
request.addMeta({ attempt: request.attemptCount });
this.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`);
this.queue(request, request, request.originQueue);
}
}
_startNext(name, request) {
const delay = request.shouldDelay() ? 1000 : 0;
setTimeout(this.start.bind(this), delay);
setTimeout(this.start.bind(this, name), delay);
}
_filter(request) {
if (this._configFilter(request.type, request.url)) {
if (!this._shouldInclude(request.type, request.url)) {
request.markSkip('Filtered');
}
return request;
@ -152,16 +183,16 @@ class Crawler {
throw new Error(`Code: ${status} for: ${request.url}`);
}
if (status === 304 && githubResponse.headers.etag === etag) {
// We have the content for this element. If it is immutable, skip.
// Otherwise get it from the store and process.
if (status === 304) {
// We have the content for this element. If we are forcing, get the content from the
// store and process. Otherwise, skip.
if (!request.force) {
return request.markSkip('Unmodified');
}
return self.store.get(fetchType, request.url).then(document => {
request.document = document;
request.response = githubResponse;
// Our store is up to date so don't
// Our store is up to date so don't store
request.store = false;
return request;
});
@ -175,7 +206,7 @@ class Crawler {
_convertToDocument(request) {
if (request.shouldSkip()) {
return Q.resolve(request);
return Q(request);
}
// If the doc is an array, wrap it in an object to make storage more consistent (Mongo can't store arrays directly)
@ -189,47 +220,39 @@ class Crawler {
fetchedAt: moment.utc().toISOString(),
links: {}
};
request.promises = [];
return Q.resolve(request);
return Q(request);
}
_processDocument(request) {
if (request.shouldSkip()) {
return Q.resolve(request);
return Q(request);
}
let handler = this.processor[request.type];
handler = handler || this[request.type];
if (!handler) {
return request.markSkip('Warning', `No handler found for request type: ${request.type}`);
}
request.document = handler.call(this.processor, request);
return Q.resolve(request);
request.document = this.processor.process(request);
return Q(request);
}
_storeDocument(request) {
// See if we should skip storing the document. Test request.store explicitly for false as it may just not be set.
if (request.shouldSkip() || !this.store || !request.document || request.store === false) {
return Q.resolve(request);
return Q(request);
}
return this.store.upsert(request.document).then((upsert) => {
return this.store.upsert(request.document).then(upsert => {
request.upsert = upsert;
return request;
});
}
_deleteFromQueue(request) {
if (!request.message) {
return Q.resolve(request);
}
return this.normalQueue.done(request).then(() => { return request; });
return request.originQueue.done(request).then(() => request);
}
_logOutcome(request) {
const outcome = request.outcome ? request.outcome : 'Processed';
if (outcome === 'Error') {
this.logger.error(request.message);
const error = (request.message instanceof Error) ? request.message : new Error(request.message);
error.request = request;
this.logger.error(error);
} else {
request.addMeta({ total: Date.now() - request.start });
this.logger.info(`${outcome} ${request.type} [${request.url}] ${request.message || ''}`, request.meta);
@ -239,27 +262,34 @@ class Crawler {
// =============== Helpers ============
// TODO make a queue all and add promises (then) to the code below
queue(request, newRequest, queue = null) {
if (this._configFilter(newRequest.type, newRequest.url)) {
if (!this._shouldInclude(newRequest.type, newRequest.url)) {
this.logger.verbose(`Filtered ${newRequest.type} [${newRequest.url}]`);
return;
}
// Create a new request data structure that has just the things we should queue
const queuable = new Request(newRequest.type, newRequest.url);
queuable.attemptCount = newRequest.attemptCount;
queuable.context = newRequest.context;
queuable.force = newRequest.force;
queuable.subType = newRequest.subType;
queue = queue || this.normalQueue;
request.promises.push(queue.push(newRequest));
request.promises.push(queue.push(queuable));
return request;
}
_configFilter(type, target) {
_shouldInclude(type, target) {
if (!this.config.orgFilter) {
return false;
return true;
}
if (type === 'repo' || type === 'repos' || type === 'org') {
const parsed = URL.parse(target);
const org = parsed.path.split('/')[2];
return !this.config.orgFilter.has(org.toLowerCase());
return this.config.orgFilter.has(org.toLowerCase());
}
return false;
return true;
}
}

Просмотреть файл

@ -25,15 +25,20 @@ class EventFinder {
}
getNewEvents(eventSource) {
const self = this;
return this.requestor.getAll(eventSource).then(self._findNew.bind(self));
return this.requestor.getAll(eventSource).then(this._findNew.bind(this));
}
// Find the events for which we do NOT have a document.
_findNew(events) {
const limit = qlimit(10);
return Q.all(events.filter(limit(event => {
return !this.eventStore.etag('event', event.url, (err, tag));
})));
const self = this;
return Q.all(events.map(qlimit(10)(event => {
return self.eventStore.etag('event', event.url).then(etag => {
return etag ? null : event;
});
}))).then(events => {
return events.filter(event =>event);
});
}
}
module.exports = EventFinder;

Просмотреть файл

@ -1,10 +1,25 @@
const parse = require('parse-link-header');
const queryString = require('query-string');
const Request = require('./request');
const URL = require('url');
class Processor {
constructor() {
}
process(request) {
let handler = this[request.type];
handler = handler || this[request.type];
if (!handler) {
request.markSkip('Warning', `No handler found for request type: ${request.type}`);
return request.document;
}
const result = handler.call(this, request);
result._metadata.version = 1;
return result;
}
collection(request) {
// if there are additional pages, queue them up to be processed. Note that these go
// on the high priority queue so they are loaded before they change much.
@ -15,7 +30,6 @@ class Processor {
const url = request.url + `?page=${i}&per_page=100`;
const newRequest = new Request('page', url);
newRequest.force = request.force;
newRequest.page = i;
newRequest.subType = request.subType;
newRequest.context = { qualifier: request.context.qualifier };
request.crawler.queue(request, newRequest, request.crawler.priorityQueue);
@ -23,17 +37,19 @@ class Processor {
}
// Rewrite the request and document to be a 'page' and then process.
request.page = 1;
request.document._metadata.type = 'page';
return this.page(request);
return this.page(request, 1);
}
page(request) {
page(request, page = null) {
const document = request.document;
const type = request.subType;
const first = document.elements[0];
const qualifier = request.context.qualifier;
request.linkSelf('self', `${qualifier}:${type}:pages:${request.page}`);
if (!page) {
const params = queryString.parse(URL.parse(request.url).search);
page = params.page;
}
request.linkSelf('self', `${qualifier}:${type}:pages:${page}`);
document.elements.forEach(item => {
request.queueChild(type, item.url, qualifier);
});
@ -42,27 +58,28 @@ class Processor {
org(request) {
const document = request.document;
request.addSelfLink('urn:');
request.addRootSelfLink();
request.linkSiblings('repos', `urn:org:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:org');
request.queueChildren('repos', document.repos_url);
// Queue this org's repos and force if we are being forced.
request.queueRoots('repos', document.repos_url, request.force);
// TODO is this "logins"
request.queueChildren('users', document.members_url.replace('{/member}', ''));
request.queueRoots('users', document.members_url.replace('{/member}', ''), request.force);
return document;
}
user(request) {
const document = request.document;
request.addSelfLink('urn:');
request.addRootSelfLink();
request.linkSiblings('repos', `urn:user:${document.id}:repos`);
request.linkSiblings('siblings', 'urn:user');
request.queueChildren('repos', document.repos_url);
request.queueRoots('repos', document.repos_url);
return document;
}
repo(request) {
const document = request.document;
request.addSelfLink('urn:');
request.addRootSelfLink();
request.linkSelf('owner', `urn:login:${document.owner.id}`);
request.linkSelf('parent', `urn:login:${document.owner.id}`);
request.linkSiblings('siblings', `urn:login:${document.owner.id}:repos`);
@ -75,7 +92,7 @@ class Processor {
commit(request) {
const document = request.document;
const context = request.context;
request.addSelfLink(null, 'sha');
request.addSelfLink('sha');
request.linkSelf('repo', `urn:repo:${context.repo}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:commits`);
@ -83,11 +100,11 @@ class Processor {
// document._metadata.links.parent = document._metadata.links.parent;
if (document.author) {
request.linkSelf('author', `urn:login:${document.author.id}`);
request.queueRoot('login', document.author.url);
request.queueRoot('user', document.author.url);
}
if (document.committer) {
request.linkSelf('committer', `urn:login:${document.committer.id}`);
request.queueRoot('login', document.committer.url);
request.queueRoot('user', document.committer.url);
}
if (document.files) {
document.files.forEach(file => {
@ -99,7 +116,7 @@ class Processor {
login(request) {
const document = request.document;
request.addSelfLink('urn:');
request.addRootSelfLink();
request.linkSelf('self', `urn:login:${document.id}`);
// TODO should we do repos here and in the user/org?
request.linkSiblings('repos', `urn:login:${document.id}:repos`);
@ -109,14 +126,13 @@ class Processor {
} else if (document.type === 'User') {
request.queueRoot('user', `https://api.github.com/users/${document.login}`);
}
request.queueChildren('repos', document.repos_url);
return document;
}
issue(request) {
const document = request.document;
const context = request.context;
request.addSelfLink(request);
request.addSelfLink();
request.linkSelf('assignees', document.assignees.map(assignee => { return `urn:login:${assignee.id}`; }));
request.linkSelf('repo', `urn:repo:${context.repo}`);
request.linkSelf('parent', `urn:repo:${context.repo}`);
@ -143,7 +159,7 @@ class Processor {
issue_comment(request) {
const document = request.document;
const context = request.context;
request.addSelfLink(request);
request.addSelfLink();
request.linkSelf('user', `urn:login:${document.user.id}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:issue:${context.issue}:comments`);
request.queue('login', document.user.url);
@ -152,7 +168,7 @@ class Processor {
team(request) {
const document = request.document;
request.addSelfLink(`urn:org:${document.organization.id}`);
request.addSelfLink();
request.linkSelf('org', `urn:org:${document.organization.id}`);
request.linkSelf('login', `urn:login:${document.organization.id}`);
request.linkSiblings('siblings', `urn:org:${document.organization.id}:teams`);

Просмотреть файл

@ -11,8 +11,15 @@ class Request {
return this;
}
addSelfLink(base = null, key = 'id') {
addRootSelfLink() {
this.addSelfLink('id', 'urn:');
}
addSelfLink(key = 'id', base = null) {
let qualifier = base ? base : this.context.qualifier;
if (!qualifier || (typeof qualifier !== 'string' )) {
console.log('bummer');
}
qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':';
this.linkSelf('self', `${qualifier}${this.type}:${this.document[key]}`);
}
@ -35,32 +42,43 @@ class Request {
this.crawler.queue(this, newRequest, queue);
}
queueRoot(type, url) {
this.crawler.queue(this, new Request(type, url));
queueRoot(type, url, force = false) {
const newRequest = new Request(type, url);
newRequest.context = { qualifier: 'urn:' };
newRequest.force = force;
this.crawler.queue(this, newRequest);
}
queueRoots(type, url, force = false) {
const newRequest = new Request(type, url);
const newContext = {};
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
newRequest.force = force;
this.crawler.queue(this, newRequest);
}
queueChild(type, url, qualifier) {
const newRequest = new Request(type, url);
newRequest.context = this.context || {};
newRequest.context.qualifier = qualifier;
if (this.force) {
newRequest.force = this.force;
}
newRequest.force = this.force;
this.crawler.queue(this, newRequest);
}
queueChildren(type, url, context = null) {
const newRequest = new Request(type, url);
const newContext = extend(this.context || {}, context);
newRequest.context = newContext;
newContext.qualifier = this.document._metadata.links.self.href;
if (this.force) {
newRequest.force = this.force;
}
newRequest.context = newContext;
newRequest.force = this.force;
this.crawler.queue(this, newRequest);
}
markSkip(outcome, message) {
if (this.shouldSkip()) {
return this;
}
this.processControl = 'skip';
this.outcome = this.outcome || outcome;
this.message = this.message || message;
@ -68,6 +86,9 @@ class Request {
}
markRequeue(outcome, message) {
if (this.shouldRequeue()) {
return this;
}
this.processControl = 'requeue';
this.outcome = this.outcome || outcome;
this.message = this.message || message;
@ -79,6 +100,9 @@ class Request {
}
markDelay() {
if (this.shouldDelay()) {
return this;
}
this.flowControl = 'delay';
return this;
}

Просмотреть файл

@ -27,12 +27,15 @@
"moment": "2.15.2",
"parse-link-header": "^0.4.1",
"q": "1.4.1",
"qlimit": "^0.1.1"
"qlimit": "^0.1.1",
"query-string": "^4.2.3"
},
"devDependencies": {
"chai": "^3.5.0",
"grunt": "^1.0.1",
"grunt-mocha-test": "^0.13.2",
"mocha": "^3.1.2"
"istanbul": "^0.4.5",
"mocha": "^3.1.2",
"sinon": "^1.17.6"
}
}

1168
test/crawlerTests.js Normal file

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -18,6 +18,7 @@ describe('Event Finder', () => {
expect(found[0].url).to.be.equal('http://test1');
});
});
it('will not find any documents', () => {
const events = [
[{ url: 'http://test1' }, { url: 'http://test2' }]
@ -29,6 +30,7 @@ describe('Event Finder', () => {
expect(found.length).to.be.equal(0);
});
});
it('will stop not finding at first found document', () => {
const events = [
[{ url: 'http://test1' }, { url: 'http://test2' }, { url: 'http://test3' }]
@ -64,10 +66,9 @@ function createStore(documents) {
extend(collection, document);
return collection;
}, {});
result.etag = (type, url, callback) => {
result.etag = (type, url) => {
let result = hash[url];
result = result ? result.etag : null;
return callback ? callback(null, result) : Q(result);
return Q(result ? result.etag : null);
};
return result;
}