extend queueSet, add relations to processor

This commit is contained in:
Jeff McAffer 2016-11-25 18:47:47 -08:00
Родитель 5e0afca516
Коммит b6770b2de1
8 изменённых файлов: 695 добавлений и 154 удалений

Просмотреть файл

@ -34,7 +34,6 @@ class Crawler {
if (options.delay === -1) {
return options.done ? options.done() : null;
}
options.delay = 0;
return Q.try(() => this.processOne(options))
.then(this.log(this._computeDelay.bind(this, options)), this._panic.bind(this, options))
.finally(this.log(this.run.bind(this, options)));
@ -50,18 +49,18 @@ class Crawler {
this.logger.error(error);
}
_computeDelay(options, delaySpec) {
_computeDelay(options, request) {
let delay = options.delay;
if (delay === -1) {
return delay;
}
delay = delay || 0;
const now = Date.now();
const requestGate = now + (delaySpec.shouldDelay() ? 2000 : 0);
const delayGate = delaySpec.nextRequestTime || now;
const nextRequestTime = Math.max(requestGate, delayGate, now);
const optionsGate = now + delay;
const requestGate = request.nextRequestTime || now;
const nextRequestTime = Math.max(optionsGate, requestGate, now);
delay = Math.max(0, nextRequestTime - now);
options.delay = delay;
options.currentDelay = delay;
return delay;
}
@ -96,7 +95,7 @@ class Crawler {
}
}
const request = new Request('_errorTrap', null);
request.markDelay();
request.delay();
request.markSkip('Error', error);
requestBox[0] = request;
return request;
@ -108,7 +107,7 @@ class Crawler {
.then(request => {
if (!request) {
request = new Request('_blank', null);
request.markDelay();
request.delay();
request.markSkip('Exhausted queue', `Waiting 1 second`);
}
request.start = Date.now();
@ -232,7 +231,7 @@ class Crawler {
// and wait a couple minutes before processing more requests
if (status === 403) {
const delay = 2 * 60 * 1000;
request.delayFor(delay);
request.delay(delay);
request.addMeta({ forbiddenDelay: delay });
return request.markRequeue(`GitHub throttled: ${request.url}`);
}
@ -351,21 +350,21 @@ class Crawler {
return this.queues.pushDead(queuable);
}
queue(request, priority = false) {
queue(request, name = 'normal') {
if (!this._shouldInclude(request.type, request.url)) {
this.logger.verbose(`Filtered ${request.type} [${request.url}]`);
return [];
}
const queuable = this._createQueuable(request);
return priority ? this.queues.pushPriority(queuable) : this.queues.push(queuable);
return this.queues.push(queuable, name);
}
_createQueuable(request) {
// Create a new request data structure that has just the things we should queue
const queuable = new Request(request.type, request.url, request.context);
queuable.attemptCount = request.attemptCount;
queuable.transitivity = request.transitivity;
queuable.fetch = request.fetch;
queuable.transitivity = request.transitivity || 'normal';
queuable.fetch = request.fetch || 'normal';
queuable.subType = request.subType;
return queuable;
}
@ -386,7 +385,7 @@ class Crawler {
const retryAfter = parseInt(response.headers['Retry-After']) || 0;
if (retryAfter > 0) {
request.addMeta({ retryAfterDelay: retryAfter });
request.delayFor(retryAfter * 1000);
request.delay(retryAfter * 1000);
}
// If we hit the low water mark for requests, proactively sleep until the next ratelimit reset

Просмотреть файл

@ -3,14 +3,18 @@ const queryString = require('query-string');
const Request = require('./request');
const URL = require('url');
// TODO open questions
// * relationship between login, user and org
// * convention for the pluralization of siblings links. Some places it is "issues", others it is "org" and "user"
// * should we track followers on users, repos... Perhaps if there are more than a certain threshold of people involved?
class Processor {
constructor() {
this.version = 1;
this.version = 2;
}
process(request) {
let handler = this[request.type];
handler = handler || this[request.type];
const handler = this._getHandler(request);
if (!handler) {
request.markSkip('Warning', `No handler found for request type: ${request.type}`);
return request.document;
@ -33,7 +37,7 @@ class Processor {
collection(request) {
// if there are additional pages, queue them up to be processed. Note that these go
// on the high priority queue so they are loaded before they change much.
// on the high soon queue so they are loaded before they change much.
const linkHeader = (request.response && request.response.headers) ? request.response.headers.link : null;
if (linkHeader) {
const links = parse(linkHeader);
@ -50,7 +54,7 @@ class Processor {
}
// TODO this is a bit reachy. need a better way to efficiently queue up
// requests that we know are good.
request.track(request.crawler.queues.pushPriority(requests, true));
request.track(request.crawler.queues.push(requests, 'soon'));
}
// Rewrite the request and document to be a 'page' and then process.
@ -67,12 +71,29 @@ class Processor {
page = params.page;
}
request.linkSelf('self', `${qualifier}:${type}:pages:${page}`);
// If the context defines a relation, create a link in this page. This can be used to
// track that a page defines a relation between an entity and a set of entities. For example,
// a repo and its teams. The teams are not exclusively "part of" the repo, they are just related.
if (request.context.relation) {
const relation = request.context.relation;
const handler = this._getHandler(request, relation);
if (!handler) {
request.markSkip('Warning', `No handler found for relation type: ${relation}`);
return document;
}
handler.call(this, request);
}
// TODO can se add a siblings link here to auto correlate the members of a collection?
document.elements.forEach(item => {
request.queueCollectionElement(type, item.url, qualifier);
});
return document;
}
_getHandler(request, type = request.type) {
return this[type];
}
org(request) {
const document = request.document;
request.addRootSelfLink();
@ -84,6 +105,9 @@ class Processor {
}
user(request) {
// TODO links to consider
// * folowers
// * following
const document = request.document;
request.addRootSelfLink();
request.linkSiblings('repos', `urn:login:${document.id}:repos`);
@ -93,17 +117,49 @@ class Processor {
}
repo(request) {
// TODO links to consider
// * forks
// * deployments
// * labels
const document = request.document;
request.addRootSelfLink();
request.linkSelf('owner', `urn:login:${document.owner.id}`);
request.linkSelf('parent', `urn:login:${document.owner.id}`);
request.linkSiblings('siblings', `urn:login:${document.owner.id}:repos`);
request.queueRoot('login', document.owner.url);
request.queueRoots('teams', document.teams_url, { relation: 'repo_teams_relation' });
request.queueRoots('collaborators', document.collaborators_url.replace('{/collaborator}', ''), { relation: 'repo_collaborators_relation' });
request.queueRoots('contributors', document.contributors_url, { relation: 'repo_contributors_relation' });
request.queueRoots('subscribers', document.subscribers_url, { relation: 'repo_subscribers_relation' });
request.queueChildren('issues', document.issues_url.replace('{/number}', ''), { repo: document.id });
request.queueChildren('commits', document.commits_url.replace('{/sha}', ''), { repo: document.id });
return document;
}
repo_teams_relation(request) {
return this._processRelation(request, 'teams', 'repo', 'team');
}
repo_collaborators_relation(request) {
return this._processRelation(request, 'collaborators', 'repo', 'user');
}
repo_contributors_relation(request) {
return this._processRelation(request, 'contributors', 'repo', 'user');
}
repo_subscribers_relation(request) {
return this._processRelation(request, 'contributors', 'repo', 'user');
}
_processRelation(request, name, originType, targetType) {
const document = request.document;
request.linkSelf(originType, `${request.context.qualifier}`);
const urns = document.elements.map(element => `urn:${targetType}:${element.id}`);
request.linkSelf(name, urns);
return document;
}
commit(request) {
const document = request.document;
const context = request.context;
@ -130,6 +186,7 @@ class Processor {
}
login(request) {
// TODO sort out relationship of login to user and org.
const document = request.document;
request.addRootSelfLink();
request.linkSelf('self', `urn:login:${document.id}`);
@ -145,6 +202,12 @@ class Processor {
}
issue(request) {
// TODO links to consider
// * milestone
// * pull request -- all pull requests are issues. Should we queue it up twice? add a link?
// * events -- issue events
// * labels
// * reactions -- get this by using the following Accept header: application/vnd.github.squirrel-girl-preview
const document = request.document;
const context = request.context;
request.addSelfLink();
@ -154,6 +217,7 @@ class Processor {
request.linkSelf('user', `urn:login:${document.user.id}`);
request.linkSiblings('siblings', `urn:repo:${context.repo}:issues`);
request.queueRoot('login', document.user.url);
request.queueRoot('repo', document.repository_url);
if (document.assignee) {
request.linkSelf('assignee', `urn:login:${document.assignee.id}`);
request.queueRoot('login', document.assignee.url);
@ -163,15 +227,13 @@ class Processor {
request.queueRoot('login', document.closed_by.url);
}
// milestone
// pull request
// events
// labels
request.queueChildren('issue_comments', document.comments_url, { issue: document.id, repo: context.repo });
return document;
}
issue_comment(request) {
// TODO links to consider
// * reactions -- get this by using the following Accept header: application/vnd.github.squirrel-girl-preview
const document = request.document;
const context = request.context;
request.addSelfLink();
@ -187,20 +249,17 @@ class Processor {
request.linkSelf('org', `urn:org:${document.organization.id}`);
request.linkSelf('login', `urn:login:${document.organization.id}`);
request.linkSiblings('siblings', `urn:org:${document.organization.id}:teams`);
request.queueChildren('team_members', document.members_url);
request.queueChildren('team_repos', document.repositories_url);
request.queueRoots('team_members', document.members_url, { relation: 'team_members_relation' });
request.queueRoots('team_repos', document.repositories_url, { relation: 'team_repos_relation' });
return document;
}
team_members(request) {
const document = request.document;
request.addSelfLink(`urn:org:${document.organization.id}`);
return document;
team_members_relation(request) {
return this._processRelation(request, 'members', 'team', 'user');
}
team_repos(request) {
request.addSelfLink(`urn:org:${document.organization.id}`);
return document;
team_repos_relation(request) {
return this._processRelation(request, 'repos', 'team', 'repo');
}
// =============== Event Processors ============

Просмотреть файл

@ -2,42 +2,48 @@ const Q = require('q');
const qlimit = require('qlimit');
class QueueSet {
constructor(priority, normal, deadletter) {
this.priority = priority;
this.normal = normal;
constructor(queues, deadletter, weights = [1]) {
this.queues = queues;
this.queueTable = queues.reduce((table, queue) => {
table[queue.name] = queue;
return table;
}, {});
if (this.queues.length > Object.getOwnPropertyNames(this.queueTable).length) {
throw new Error('Duplicate queue names');
}
this.deadletter = deadletter;
this.allQueues = [priority, normal, deadletter];
this.count = 0;
this.startMap = this._createStartMap(weights);
this.popCount = 0;
}
pushPriority(requests) {
return this.push(requests, this.priority);
}
push(requests, queue = this.normal) {
return queue.push(requests);
push(requests, name) {
return this._findQueue(name).push(requests);
}
pushDead(requests) {
return this.push(requests, this.deadletter);
return this.deadletter.push(requests);
}
repush(original, newRequest) {
return this.push(newRequest, original._originQueue);
return original._originQueue.push(newRequest);
}
subscribe() {
return Q.all(this.allQueues.map(queue => { return queue.subscribe(); }));
return Q.all(this.queues.concat([this.deadletter]).map(queue => { return queue.subscribe(); }));
}
unsubscribe() {
return Q.all(this.allQueues.map(queue => { return queue.unsubscribe(); }));
return Q.all(this.queues.concat([this.deadletter]).map(queue => { return queue.unsubscribe(); }));
}
pop() {
return Q()
.then(() => { return (this.count++ % 10) > 3 ? null : this._pop(this.priority); })
.then(this._pop.bind(this, this.normal));
pop(startMap = this.startMap) {
let result = Q();
const start = startMap[this.popCount++ % startMap.length];
for (let i = 0; i < this.queues.length; i++) {
const queue = this.queues[(start + i) % this.queues.length];
result = result.then(this._pop.bind(this, queue));
}
return result;
}
_pop(queue, request = null) {
@ -52,11 +58,39 @@ class QueueSet {
}
done(request) {
return request._originQueue ? request._originQueue.done(request) : Q();
const acked = request.acked;
request.acked = true;
return !acked ? request._originQueue.done(request) : Q();
}
abandon(request) {
return request._originQueue ? request._originQueue.abandon(request) : Q();
const acked = request.acked;
request.acked = true;
return !acked ? request._originQueue.abandon(request) : Q();
}
_findQueue(name) {
const result = this.queueTable[name];
if (!result) {
throw new Error(`Queue not found: ${name}`);
}
return result;
}
_createStartMap(weights) {
if (this.queues.length < weights.length) {
throw new Error('Cannot have more weights than queues');
}
const result = [];
for (let i = 0; i < weights.length; i++) {
for (let j = 0; j < weights[i]; j++) {
result.push(i);
}
}
if (result.length === 0) {
throw new Error('Weights must not be empty');
}
return result;
}
}

Просмотреть файл

@ -52,7 +52,7 @@ class Request {
addSelfLink(key = 'id', base = null) {
let qualifier = base ? base : this.context.qualifier;
if (!qualifier || (typeof qualifier !== 'string')) {
console.log('bummer');
throw new Error('Need something on which to base the self link URN');
}
qualifier = qualifier.endsWith(':') ? qualifier : qualifier + ':';
this.linkSelf('self', `${qualifier}${this.type}:${this.document[key]}`);
@ -73,6 +73,7 @@ class Request {
queue(type, url, context) {
const newRequest = new Request(type, url);
newRequest.context = context;
newRequest.fetch = this.fetch;
this.track(this.crawler.queue(newRequest));
}
@ -85,18 +86,22 @@ class Request {
newRequest.context = { qualifier: 'urn:' };
// set the new request's transitivity to the next value
newRequest.transitivity = transitivity;
newRequest.fetch = this.fetch;
this.track(this.crawler.queue(newRequest));
}
queueRoots(type, url) {
queueRoots(type, url, context = null) {
const transitivity = this._getRootTransitivity();
if (!transitivity) {
return;
}
const newRequest = new Request(type, url);
newRequest.context = { qualifier: this.document._metadata.links.self.href };
const newContext = extend({}, this.context, context);
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
// carry over this requests transitivity as we are queuing a collection
newRequest.transitivity = this.transitivity;
newRequest.fetch = this.fetch;
this.track(this.crawler.queue(newRequest));
}
@ -116,6 +121,7 @@ class Request {
newRequest.context = this.context || {};
newRequest.context.qualifier = qualifier;
newRequest.transitivity = transitivity;
newRequest.fetch = this.fetch;
this.track(this.crawler.queue(newRequest));
}
@ -125,11 +131,12 @@ class Request {
return;
}
const newRequest = new Request(type, url);
const newContext = extend(this.context || {}, context);
const newContext = extend({}, this.context, context);
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
// carry over this requests transitivity as we are queuing a collection
newRequest.transitivity = this.transitivity;
newRequest.fetch = this.fetch;
this.track(this.crawler.queue(newRequest));
}
@ -177,25 +184,13 @@ class Request {
return this.processControl === 'skip' || this.processControl === 'requeue';
}
markDelay() {
if (this.shouldDelay()) {
return this;
}
this.flowControl = 'delay';
return this;
}
shouldDelay() {
return this.flowControl === 'delay';
}
delayUntil(time) {
if (!this.nextRequestTime || this.nextRequestTime < time) {
this.nextRequestTime = time;
}
}
delayFor(milliseconds) {
delay(milliseconds = 2000) {
this.delayUntil(Date.now() + milliseconds);
}
@ -220,7 +215,7 @@ class Request {
getCollectionType() {
const collections = {
orgs: 'org', repos: 'repo', issues: 'issue', issue_comments: 'issue_comment', commits: 'commit', teams: 'team', users: 'user'
orgs: 'org', repos: 'repo', issues: 'issue', issue_comments: 'issue_comment', commits: 'commit', teams: 'team', users: 'user', team_members: 'user', team_repos: 'repo', collaborators: 'user', contributors: 'user', subscribers: 'user'
};
return collections[this.type];
}

Просмотреть файл

@ -10,15 +10,15 @@ const sinon = require('sinon');
describe('Crawler get request', () => {
it('should get from the priority queue first', () => {
const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } });
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('priority');
expect(request._originQueue === queues.priority).to.be.true;
expect(request._originQueue === priority).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
@ -26,15 +26,15 @@ describe('Crawler get request', () => {
});
it('should get from the normal queue if no priority', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } });
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, { name: 'test' }).then(request => {
expect(request.type).to.be.equal('normal');
expect(request._originQueue === queues.normal).to.be.true;
expect(request._originQueue === normal).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
@ -42,8 +42,8 @@ describe('Crawler get request', () => {
});
it('should return a dummy skip/delay request if none are queued', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
@ -51,15 +51,15 @@ describe('Crawler get request', () => {
expect(request.type).to.be.equal('_blank');
expect(request.lock).to.be.undefined;
expect(request.shouldSkip()).to.be.true;
expect(request.flowControl).to.be.equal('delay');
expect(request.nextRequestTime - Date.now()).to.be.approximately(2000, 4);
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
});
});
it('should throw when normal pop errors', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { throw new Error('normal test'); } });
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { throw new Error('normal test'); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
@ -70,8 +70,8 @@ describe('Crawler get request', () => {
});
it('should throw when priority pop errors', () => {
const priority = createBaseQueue({ pop: () => { throw new Error('priority test'); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const priority = createBaseQueue('priority', { pop: () => { throw new Error('priority test'); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
@ -82,8 +82,8 @@ describe('Crawler get request', () => {
});
it('should throw when acquire lock errors', () => {
const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
@ -96,14 +96,14 @@ describe('Crawler get request', () => {
it('should abandon the request when the lock cannot be acquired', () => {
const abandoned = [];
const priority = createBaseQueue({
const priority = createBaseQueue('priority', {
pop: () => { return Q(new Request('priority', 'http://test')); },
abandon: request => {
abandoned.push(request);
return Q();
}
});
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
@ -118,11 +118,11 @@ describe('Crawler get request', () => {
it('should get lock error even if abandon fails', () => {
const abandoned = [];
const priority = createBaseQueue({
const priority = createBaseQueue('priority', {
pop: () => { return Q(new Request('priority', 'http://test')); },
abandon: request => { throw new Error('Abandon error'); }
});
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
@ -423,7 +423,7 @@ describe('Crawler error handler', () => {
const error = 'error';
const request = crawler._errorHandler(box, error);
expect(request.message).to.be.equal(error);
expect(request.flowControl).to.be.equal('delay');
expect(request.nextRequestTime - Date.now()).to.be.approximately(2000, 4);
});
});
@ -500,7 +500,7 @@ describe('Crawler queue', () => {
it('should not queue if filtered', () => {
const config = { orgFilter: new Set(['test']) };
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const request = new Request('repo', 'http://api.github.com/repo/microsoft/test');
const crawler = createBaseCrawler({ queues: queues, options: config });
@ -512,7 +512,7 @@ describe('Crawler queue', () => {
it('should queue if not filtered', () => {
const config = { orgFilter: new Set(['microsoft']) };
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const request = new Request('repo', 'http://api.github.com/repo/microsoft/test');
const crawler = createBaseCrawler({ queues: queues, options: config });
@ -528,7 +528,7 @@ describe('Crawler queue', () => {
it('should queue in supplied queue', () => {
const config = { orgFilter: new Set(['microsoft']) };
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const request = new Request('repo', 'http://api.github.com/repo/microsoft/test');
const crawler = createBaseCrawler({ queues: queues, options: config });
@ -551,7 +551,7 @@ describe('Crawler requeue', () => {
it('should requeue in same queue as before', () => {
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const crawler = createBaseCrawler({ queues: queues });
for (let i = 0; i < 5; i++) {
@ -574,8 +574,8 @@ describe('Crawler requeue', () => {
it('should requeue in deadletter queue after 5 attempts', () => {
const queue = [];
const deadletterQueue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const deadletter = createBaseQueue({ push: request => { deadletterQueue.push(request); return Q(); } });
const normal = createBaseQueue('normal', { push: request => { queue.push(request); return Q(); } });
const deadletter = createBaseQueue('deadletter', { push: request => { deadletterQueue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal, deadletter: deadletter });
const request = new Request('test', 'http://api.github.com/repo/microsoft/test');
request.attemptCount = 5;
@ -597,7 +597,7 @@ describe('Crawler complete request', () => {
it('should unlock, dequeue and return the request being completed', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const normal = createBaseQueue('normal', { done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
@ -618,7 +618,7 @@ describe('Crawler complete request', () => {
const queue = [];
const done = [];
const unlock = [];
const normal = createBaseQueue({
const normal = createBaseQueue('normal', {
push: request => { queue.push(request); return Q(); },
done: request => { done.push(request); return Q(); }
});
@ -646,7 +646,7 @@ describe('Crawler complete request', () => {
it('should do all right things for requests with no url', () => {
const done = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const normal = createBaseQueue('normal', { done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const originalRequest = new Request('test', null);
originalRequest.markRequeue();
@ -664,7 +664,7 @@ describe('Crawler complete request', () => {
const done = [];
const unlock = [];
const promiseValue = [];
const normal = createBaseQueue({
const normal = createBaseQueue('normal', {
done: request => {
if (!promiseValue[0]) assert.fail();
done.push(request);
@ -700,7 +700,7 @@ describe('Crawler complete request', () => {
it('still dequeues and unlocks if promises fail', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const normal = createBaseQueue('normal', { done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
@ -721,7 +721,7 @@ describe('Crawler complete request', () => {
it('still dequeues when unlocking fails', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const normal = createBaseQueue('normal', { done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: () => { throw new Error('sigh'); } });
const originalRequest = new Request('test', 'http://test.com');
@ -742,7 +742,7 @@ describe('Crawler complete request', () => {
it('still unlocks when dequeue fails', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: () => { throw new Error('sigh'); } });
const normal = createBaseQueue('normal', { done: () => { throw new Error('sigh'); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
@ -946,17 +946,17 @@ describe('Crawler run', () => {
});
describe('Crawler whole meal deal', () => {
it('should delay starting next iteration when markDelay', () => {
it('should delay starting next iteration when delay', () => {
const crawler = createBaseCrawler();
crawler.run = () => { };
crawler.processOne = () => { return Q(request) };
const request = new Request('user', 'http://test.com/users/user1');
request.markDelay();
request.delay();
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(options.delay).to.be.equal(2000);
expect(options.currentDelay).to.be.approximately(2000, 4);
});
});
@ -970,33 +970,35 @@ describe('Crawler whole meal deal', () => {
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(options.delay).to.be.approximately(323, 4);
expect(options.currentDelay).to.be.approximately(323, 4);
});
});
it('should delay starting next iteration when delayFor', () => {
it('should delay starting next iteration when delay', () => {
const crawler = createBaseCrawler();
crawler.run = () => { };
crawler.processOne = () => { return Q(request) };
const request = new Request('user', 'http://test.com/users/user1');
request.delayFor(451);
request.delay(451);
const options = { name: 'foo', delay: 0 };
return crawler._run(options).then(() => {
expect(options.delay).to.be.approximately(451, 4);
expect(options.currentDelay).to.be.approximately(451, 4);
});
});
it('should process normal requests', () => {
const crawler = createFullCrawler();
const normal = crawler.queues.queueTable['normal'];
const priority = crawler.queues.queueTable['priority'];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count');
expect(priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(normal.pop.callCount).to.be.equal(1, 'normal call count');
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1, 'lock call count');
@ -1025,7 +1027,7 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(normal.done.callCount).to.be.equal(1);
expect(crawler.logger.info.callCount).to.be.equal(1);
},
@ -1038,19 +1040,22 @@ describe('Crawler whole meal deal', () => {
it('should handle getRequest reject', () => {
const crawler = createFullCrawler();
const priority = crawler.queues.queueTable['priority'];
// setup a problem popping
const normal = createBaseQueue();
const normal = createBaseQueue('normal');
sinon.stub(normal, 'pop', () => { throw Error('cant pop') });
sinon.stub(normal, 'push', request => { return Q(); });
sinon.spy(normal, 'done');
crawler.queues.normal = normal;
// hmmm, hack in the new normal queue
crawler.queues.queueTable['normal'] = normal;
crawler.queues.queues[1] = normal;
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
expect(priority.pop.callCount).to.be.equal(1);
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(0);
@ -1061,7 +1066,7 @@ describe('Crawler whole meal deal', () => {
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(0);
const push = crawler.queues.normal.push;
const push = normal.push;
expect(push.callCount).to.be.equal(0);
const upsert = crawler.store.upsert;
@ -1070,7 +1075,7 @@ describe('Crawler whole meal deal', () => {
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(0);
expect(crawler.queues.normal.done.callCount).to.be.equal(0);
expect(normal.done.callCount).to.be.equal(0);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
@ -1081,14 +1086,16 @@ describe('Crawler whole meal deal', () => {
it('should handle fetch reject', () => {
const crawler = createFullCrawler();
const normal = crawler.queues.queueTable['normal'];
const priority = crawler.queues.queueTable['priority'];
// setup a good request but a server error response
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
expect(priority.pop.callCount).to.be.equal(1);
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
@ -1103,7 +1110,7 @@ describe('Crawler whole meal deal', () => {
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.queues.normal.push;
const push = normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1116,7 +1123,7 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
@ -1128,13 +1135,15 @@ describe('Crawler whole meal deal', () => {
it('should handle process document reject', () => {
const crawler = createFullCrawler();
crawler.processor = { process: () => { throw new Error('bad processor') } };
const priority = crawler.queues.queueTable['priority'];
const normal = crawler.queues.queueTable['normal'];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
expect(priority.pop.callCount).to.be.equal(1);
expect(normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
@ -1151,7 +1160,7 @@ describe('Crawler whole meal deal', () => {
expect(crawler._errorHandler.callCount).to.be.equal(1);
const push = crawler.queues.normal.push;
const push = normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1164,7 +1173,7 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
@ -1176,8 +1185,9 @@ describe('Crawler whole meal deal', () => {
it('should handle store document reject', () => {
const crawler = createFullCrawler();
crawler.store = { upsert: () => { throw new Error('bad upsert') } };
const normal = crawler.queues.queueTable['normal'];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
@ -1185,9 +1195,9 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(normal.done.callCount).to.be.equal(1);
const push = crawler.queues.normal.push;
const push = normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1203,12 +1213,13 @@ describe('Crawler whole meal deal', () => {
it('should handle complete request reject', () => {
const crawler = createFullCrawler();
crawler.locker = { unlock: () => { throw new Error('bad unlock') } };
const normal = crawler.queues.queueTable['normal'];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => { return crawler.processOne({ name: 'test' }); }).then(
() => {
const push = crawler.queues.normal.push;
const push = normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1223,11 +1234,11 @@ describe('Crawler whole meal deal', () => {
});
function createFullCrawler() {
const priority = createBaseQueue();
const priority = createBaseQueue('priority');
priority.requests = [];
sinon.stub(priority, 'pop', () => { return Q(priority.requests.shift()); });
const normal = createBaseQueue();
const normal = createBaseQueue('normal');
normal.requests = [];
sinon.stub(normal, 'pop', () => { return Q(normal.requests.shift()); });
sinon.stub(normal, 'push', request => { return Q(); });
@ -1299,11 +1310,11 @@ function createBaseCrawler({queues = createBaseQueues(), store = createBaseStore
}
function createBaseQueues({ priority = null, normal = null, deadletter = null} = {}) {
return new QueueSet(priority || createBaseQueue(), normal || createBaseQueue(), deadletter || createBaseQueue());
return new QueueSet([priority || createBaseQueue('priority'), normal || createBaseQueue('normal')], deadletter || createBaseQueue('deadletter'));
}
function createBaseQueue({ pop = null, push = null, done = null, abandon = null} = {}) {
const result = {};
function createBaseQueue(name, { pop = null, push = null, done = null, abandon = null} = {}) {
const result = { name: name };
result.pop = pop || (() => assert.fail('should not pop'));
result.push = push || (() => assert.fail('should not push'));
result.done = done || (() => assert.fail('should not done'));

Просмотреть файл

@ -51,15 +51,16 @@ describe('Collection processing', () => {
headers: { link: createLinkHeader(request.url, null, 2, 2) }
};
request.document = { _metadata: { links: {} }, elements: [{ type: 'issue', url: 'http://child1' }] };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
request.crawler = { queue: () => { }, queues: { push: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const push = sinon.spy(request.crawler.queues, 'push');
const processor = new Processor();
processor.collection(request);
expect(request.crawler.queues.pushPriority.callCount).to.be.equal(1);
const newPages = request.crawler.queues.pushPriority.getCall(0).args[0];
expect(request.crawler.queues.push.callCount).to.be.equal(1);
expect(push.getCall(0).args[1]).to.be.equal('soon');
const newPages = request.crawler.queues.push.getCall(0).args[0];
expect(newPages.length).to.be.equal(1);
expect(newPages[0].transitivity).to.be.equal('forceNormal');
expect(newPages[0].url).to.be.equal('http://test.com/issues?page=2&per_page=100');
@ -81,15 +82,17 @@ describe('Collection processing', () => {
headers: { link: createLinkHeader(request.url, null, 2, 2) }
};
request.document = { _metadata: { links: {} }, elements: [{ type: 'org', url: 'http://child1' }] };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
request.crawler = { queue: () => { }, queues: { push: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const push = sinon.spy(request.crawler.queues, 'push');
const processor = new Processor();
processor.collection(request);
expect(request.crawler.queues.pushPriority.callCount).to.be.equal(1);
const newPages = request.crawler.queues.pushPriority.getCall(0).args[0];
expect(push.callCount).to.be.equal(1);
expect(push.getCall(0).args[1]).to.be.equal('soon');
const newPages = push.getCall(0).args[0];
expect(newPages.length).to.be.equal(1);
expect(newPages[0].transitivity).to.be.equal('forceNormal');
expect(newPages[0].url).to.be.equal('http://test.com/orgs?page=2&per_page=100');
@ -111,15 +114,16 @@ describe('Collection processing', () => {
headers: { link: createLinkHeader(request.url, null, 2, 2) }
};
request.document = { _metadata: { links: {} }, elements: [{ type: 'org', url: 'http://child1' }] };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
request.crawler = { queue: () => { }, queues: { push: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const push = sinon.spy(request.crawler.queues, 'push');
const processor = new Processor();
processor.collection(request);
expect(request.crawler.queues.pushPriority.callCount).to.be.equal(1);
const newPages = request.crawler.queues.pushPriority.getCall(0).args[0];
expect(push.callCount).to.be.equal(1);
expect(push.getCall(0).args[1]).to.be.equal('soon');
const newPages = push.getCall(0).args[0];
expect(newPages.length).to.be.equal(1);
expect(newPages[0].transitivity).to.be.equal('forceForce');
expect(newPages[0].url).to.be.equal('http://test.com/orgs?page=2&per_page=100');
@ -149,9 +153,54 @@ describe('Collection processing', () => {
expect(newRequest.url).to.be.equal('http://child1');
expect(newRequest.type).to.be.equal('org');
});
});
describe('URN building', () => {
it('should create urn for team members', () => {
const request = new Request('repo', 'http://test.com/foo');
request.document = { _metadata: { links: {} }, id: 42, owner: { url: 'http://test.com/test' }, teams_url: 'http://test.com/teams', issues_url: 'http://test.com/issues', commits_url: 'http://test.com/commits', collaborators_url: 'http://test.com/collaborators' };
request.crawler = { queue: () => { }, queues: { pushPriority: () => { } } };
sinon.spy(request.crawler, 'queue');
sinon.spy(request.crawler.queues, 'pushPriority');
const processor = new Processor();
processor.repo(request);
expect(request.crawler.queue.callCount).to.be.at.least(4);
const teamsRequest = request.crawler.queue.getCall(1).args[0];
expect(teamsRequest.context.qualifier).to.be.equal('urn:repo:42');
expect(teamsRequest.context.relation).to.be.equal('repo_teams_relation');
request.crawler.queue.reset();
teamsRequest.type = 'collection';
teamsRequest.subType = 'team';
teamsRequest.document = { _metadata: { links: {} }, elements: [{ id: 13, url: 'http://team1' }] };
teamsRequest.crawler = request.crawler;
const teamsPage = processor.collection(teamsRequest);
const links = teamsPage._metadata.links;
expect(links.teams.type).to.be.equal('self');
expect(links.teams.hrefs.length).to.be.equal(1);
expect(links.teams.hrefs[0]).to.be.equal('urn:team:13');
expect(links.repo.type).to.be.equal('self');
expect(links.repo.href).to.be.equal('urn:repo:42');
const teamRequest = request.crawler.queue.getCall(0).args[0];
expect(teamRequest.type).to.be.equal('team');
expect(teamRequest.context.qualifier).to.be.equal('urn:');
request.crawler.queue.reset();
teamRequest.document = { _metadata: { links: {} }, id: 54, organization: { id: 87 }, members_url: "http://team1/members", repositories_url: "http://team1/repos" };
teamRequest.crawler = request.crawler;
processor.team(teamRequest);
const membersRequest = request.crawler.queue.getCall(0).args[0];
expect(membersRequest.url).to.be.equal('http://team1/members');
expect(membersRequest.context.qualifier).to.be.equal('urn:team:54');
expect(membersRequest.context.relation).to.be.equal('team_members_relation');
const reposRequest = request.crawler.queue.getCall(1).args[0];
expect(reposRequest.url).to.be.equal('http://team1/repos');
expect(reposRequest.context.qualifier).to.be.equal('urn:team:54');
expect(reposRequest.context.relation).to.be.equal('team_repos_relation');
});
});
function createLinkHeader(target, previous, next, last) {
separator = target.includes('?') ? '&' : '?';

256
test/queueSetTests.js Normal file
Просмотреть файл

@ -0,0 +1,256 @@
const assert = require('chai').assert;
const chai = require('chai');
const expect = require('chai').expect;
const Request = require('../lib/request.js');
const Q = require('q');
const QueueSet = require('../lib/queueSet.js');
const sinon = require('sinon');
describe('QueueSet construction', () => {
it('should throw on duplicate queue names', () => {
expect(() => new QueueSet([{ name: '1' }, { name: '1' }])).to.throw(Error);
});
});
describe('QueueSet weighting', () => {
it('should create a simple startMap', () => {
const set = new QueueSet([{ name: '1' }, { name: '2' }], null, [3, 2]);
expect(set.startMap.length).to.be.equal(5);
expect(set.startMap[0]).to.be.equal(0);
expect(set.startMap[2]).to.be.equal(0);
expect(set.startMap[3]).to.be.equal(1);
expect(set.startMap[4]).to.be.equal(1);
});
it('should create a default startMap if no weights given', () => {
const set = new QueueSet([{ name: '1' }, { name: '2' }]);
expect(set.startMap.length).to.be.equal(1);
expect(set.startMap[0]).to.be.equal(0);
});
it('should throw if too many weights are given', () => {
expect(() => new QueueSet([{ name: '1' }, { name: '2' }], null, [3, 2, 1])).to.throw(Error);
});
it('should throw if no weights are given', () => {
expect(() => new QueueSet([{ name: '1' }, { name: '2' }], null, [])).to.throw(Error);
});
it('should create a simple startMap', () => {
const set = new QueueSet([{ name: '1' }, { name: '2' }], null, [3, 2]);
expect(set.startMap.length).to.be.equal(5);
expect(set.startMap[0]).to.be.equal(0);
expect(set.startMap[2]).to.be.equal(0);
expect(set.startMap[3]).to.be.equal(1);
expect(set.startMap[4]).to.be.equal(1);
});
it('should pop from first with default weights', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('priority');
expect(first._originQueue === priority).to.be.true;
expect(second.type).to.be.equal('priority');
expect(second._originQueue === priority).to.be.true;
});
});
it('should pop in order when requests always available', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues([priority, normal], null, [1, 1]);
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('priority');
expect(first._originQueue === priority).to.be.true;
expect(second.type).to.be.equal('normal');
expect(second._originQueue === normal).to.be.true;
});
});
it('should pop from subsequent if previous queue is empty', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(null); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues([priority, normal], null, [1, 1]);
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('normal');
expect(first._originQueue === normal).to.be.true;
expect(second.type).to.be.equal('normal');
expect(second._originQueue === normal).to.be.true;
});
});
it('should pop earlier queue if starting later and nothing available', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue('normal', { pop: () => { return Q(null); } });
const queues = createBaseQueues([priority, normal], null, [1, 1]);
queues.popCount = 1;
return Q.all([queues.pop(), queues.pop()]).spread((first, second) => {
expect(first.type).to.be.equal('priority');
expect(first._originQueue === priority).to.be.true;
expect(second.type).to.be.equal('priority');
expect(second._originQueue === priority).to.be.true;
});
});
});
describe('QueueSet pushing', () => {
it('should accept a simple request into a named queue', () => {
const priority = createBaseQueue('priority', { push: (requests, name) => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'push');
const request = new Request('test', 'http://test');
return queues.push(request, 'priority').then(() => {
expect(priority.push.callCount).to.be.equal(1);
expect(priority.push.getCall(0).args[0].type).to.be.equal('test');
});
});
it('should throw when pushing into an unknown queue', () => {
const priority = createBaseQueue('priority', { push: (requests, name) => { return Q(); } });
const normal = createBaseQueue('normal', { push: (requests, name) => { return Q(); } });
const queues = createBaseQueues([priority, normal]);
const request = new Request('test', 'http://test');
expect(() => queues.push(request, 'foo')).to.throw(Error);
});
it('should repush into the same queue', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, push: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'push');
return queues.pop().then(request => {
return queues.repush(request, request).then(() => {
expect(request._originQueue === priority).to.be.true;
expect(priority.push.callCount).to.be.equal(1);
expect(priority.push.getCall(0).args[0].type).to.be.equal('test');
});
});
});
});
describe('QueueSet originQueue management', () => {
it('should call done and mark acked on done', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, done: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'done');
return queues.pop().then(request => {
return queues.done(request).then(() => {
expect(request.acked).to.be.true;
expect(priority.done.callCount).to.be.equal(1);
expect(priority.done.getCall(0).args[0].type).to.be.equal('test');
});
});
});
it('should call done and mark acked on abandon', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'abandon');
return queues.pop().then(request => {
return queues.abandon(request).then(() => {
expect(request.acked).to.be.true;
expect(priority.abandon.callCount).to.be.equal(1);
expect(priority.abandon.getCall(0).args[0].type).to.be.equal('test');
});
});
});
it('should not abandon twice', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'abandon');
return queues.pop().then(request => {
return queues.abandon(request).then(() => {
return queues.abandon(request).then(() => {
expect(request.acked).to.be.true;
expect(priority.abandon.callCount).to.be.equal(1);
expect(priority.abandon.getCall(0).args[0].type).to.be.equal('test');
});
});
});
});
it('should not done after abandon ', () => {
const priority = createBaseQueue('priority', { pop: () => { return Q(new Request('test', 'http://test')); }, abandon: request => { return Q(); }, done: request => { return Q(); } });
const normal = createBaseQueue('normal');
const queues = createBaseQueues([priority, normal]);
sinon.spy(priority, 'abandon');
sinon.spy(priority, 'done');
return queues.pop().then(request => {
return queues.abandon(request).then(() => {
return queues.done(request).then(() => {
expect(request.acked).to.be.true;
expect(priority.done.callCount).to.be.equal(0);
expect(priority.abandon.callCount).to.be.equal(1);
expect(priority.abandon.getCall(0).args[0].type).to.be.equal('test');
});
});
});
});
});
describe('QueueSet subscription management', () => {
it('should subscribe all, including deadletter', () => {
const priority = createBaseQueue('priority', { subscribe: () => { } });
const normal = createBaseQueue('normal', { subscribe: () => { } });
const deadletter = createBaseQueue('deadletter', { subscribe: () => { } });
const queues = createBaseQueues([priority, normal], deadletter);
sinon.spy(priority, 'subscribe');
sinon.spy(normal, 'subscribe');
sinon.spy(deadletter, 'subscribe');
return queues.subscribe().then(() => {
expect(priority.subscribe.callCount).to.be.equal(1);
expect(normal.subscribe.callCount).to.be.equal(1);
expect(deadletter.subscribe.callCount).to.be.equal(1);
});
});
it('should unsubscribe all, including deadletter', () => {
const priority = createBaseQueue('priority', { unsubscribe: () => { } });
const normal = createBaseQueue('normal', { unsubscribe: () => { } });
const deadletter = createBaseQueue('deadletter', { unsubscribe: () => { } });
const queues = createBaseQueues([priority, normal], deadletter);
sinon.spy(priority, 'unsubscribe');
sinon.spy(normal, 'unsubscribe');
sinon.spy(deadletter, 'unsubscribe');
return queues.unsubscribe().then(() => {
expect(priority.unsubscribe.callCount).to.be.equal(1);
expect(normal.unsubscribe.callCount).to.be.equal(1);
expect(deadletter.unsubscribe.callCount).to.be.equal(1);
});
});
});
function createBaseQueues(queues, deadletter, weights) {
return new QueueSet(queues, deadletter || createBaseQueue('deadletter'), weights);
}
function createBaseQueue(name, { pop = null, push = null, done = null, abandon = null, subscribe = null, unsubscribe = null} = {}) {
const result = { name: name };
result.pop = pop || (() => assert.fail('should not pop'));
result.push = push || (() => assert.fail('should not push'));
result.done = done || (() => assert.fail('should not done'));
result.abandon = abandon || (() => assert.fail('should not abandon'));
result.subscribe = subscribe || (() => assert.fail('should not subscribe'));
result.unsubscribe = unsubscribe || (() => assert.fail('should not unsubscribe'));
return result;
}

Просмотреть файл

@ -14,14 +14,43 @@ describe('Request transitivity', () => {
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will not queueRoots if none transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'none';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoots('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will not queueChild if none transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'none';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChild('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will not queueChildren if none transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'none';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueChildren('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(0);
});
it('will queueRoot normal if normal transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'normal';
request.fetch = 'none';
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoot('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('normal');
expect(request.crawler.queue.getCall(0).args[0].fetch).to.be.equal('none');
});
it('will not queueRoot if forceNone transitivity', () => {
@ -53,6 +82,20 @@ describe('Request transitivity', () => {
expect(request.crawler.queue.getCall(0).args[0].transitivity).to.be.equal('forceNormal');
});
it('queueRoots will not change transitivity and will carry through fetch', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'forceForce';
request.fetch = 'force';
request.document = { _metadata: { links: { self: { href: 'urn:pick:me' } } } };
request.crawler = { queue: () => { } };
sinon.spy(request.crawler, 'queue');
request.queueRoots('foo', 'http://');
expect(request.crawler.queue.callCount).to.be.equal(1);
const newRequest = request.crawler.queue.getCall(0).args[0];
expect(newRequest.transitivity).to.be.equal('forceForce');
expect(newRequest.fetch).to.be.equal('force');
});
it('will not queueChild if none transitivity', () => {
const request = new Request('user', 'http://test.com/users/user1');
request.transitivity = 'none';
@ -107,3 +150,98 @@ describe('Request context/qualifier', () => {
it('will not queueRoot if none transitivity', () => {
});
});
describe('Request link management', () => {
it('will throw if no qualifier available', () => {
const request = new Request('foo', 'http://test');
try {
request.addSelfLink();
assert.fail();
} catch (error) {
expect(error).to.not.be.null;
}
});
it('will add a : to the qualifier', () => {
const request = new Request('foo', 'http://test');
request.document = { id: 4, _metadata: { links: {} } };
request.addSelfLink('id', 'test');
expect(request.document._metadata.links.self.href.startsWith('test:foo'));
});
});
describe('Request promise management', () => {
it('will track single promises', () => {
const request = new Request('test', 'http://test');
request.track('foo');
expect(request.promises.length).to.be.equal(1);
expect(request.promises[0]).to.be.equal('foo');
});
it('will track multiple promises', () => {
const request = new Request('test', 'http://test');
request.track(['foo', 'bar']);
expect(request.promises.length).to.be.equal(2);
expect(request.promises[0]).to.be.equal('foo');
expect(request.promises[1]).to.be.equal('bar');
request.track(['x', 'y']);
expect(request.promises.length).to.be.equal(4);
expect(request.promises[2]).to.be.equal('x');
expect(request.promises[3]).to.be.equal('y');
});
});
describe('Request marking', () => {
it('will markSkip and preserve the first value', () => {
const request = new Request('test', 'http://test');
request.markSkip('foo', 'bar');
expect(request.shouldSkip()).to.be.true;
expect(request.outcome).to.be.equal('foo');
expect(request.message).to.be.equal('bar');
request.markSkip('x', 'y');
expect(request.shouldSkip()).to.be.true;
expect(request.outcome).to.be.equal('foo');
expect(request.message).to.be.equal('bar');
});
it('will markSkip and preserve the first value even if not set', () => {
const request = new Request('test', 'http://test');
request.markSkip();
expect(request.shouldSkip()).to.be.true;
expect(request.outcome).to.be.undefined;
expect(request.message).to.be.undefined;
request.markSkip('x', 'y');
expect(request.shouldSkip()).to.be.true;
expect(request.outcome).to.be.undefined;
expect(request.message).to.be.undefined;
});
it('will markRequeue and preserve the first value', () => {
const request = new Request('test', 'http://test');
request.markRequeue('foo', 'bar');
expect(request.shouldRequeue()).to.be.true;
expect(request.outcome).to.be.equal('foo');
expect(request.message).to.be.equal('bar');
request.markRequeue('x', 'y');
expect(request.shouldRequeue()).to.be.true;
expect(request.outcome).to.be.equal('foo');
expect(request.message).to.be.equal('bar');
});
it('will markRequeue and preserve the first value even if not set', () => {
const request = new Request('test', 'http://test');
request.markRequeue();
expect(request.shouldRequeue()).to.be.true;
expect(request.outcome).to.be.undefined;
expect(request.message).to.be.undefined;
request.markRequeue('x', 'y');
expect(request.shouldRequeue()).to.be.true;
expect(request.outcome).to.be.undefined;
expect(request.message).to.be.undefined;
});
});