collapse queues into a queueset

This commit is contained in:
Jeff McAffer 2016-11-17 15:33:44 -08:00
Родитель 9a1546439e
Коммит 5c39058887
6 изменённых файлов: 252 добавлений и 138 удалений

Просмотреть файл

@ -1,5 +1,6 @@
module.exports.eventFinder = require('./lib/eventFinder');
module.exports.webhookDriver = require('./lib/webhookDriver');
module.exports.crawler = require('./lib/crawler');
module.exports.request = require('./lib/request');
module.exports.eventFinder = require('./lib/eventFinder');
module.exports.processor = require('./lib/processor');
module.exports.queueSet = require('./lib/queueSet');
module.exports.request = require('./lib/request');
module.exports.webhookDriver = require('./lib/webhookDriver');

Просмотреть файл

@ -6,10 +6,8 @@ const Request = require('./request');
const URL = require('url');
class Crawler {
constructor(queue, priorityQueue, deadletterQueue, store, locker, requestor, config, logger) {
this.normalQueue = queue;
this.priorityQueue = priorityQueue;
this.deadletterQueue = deadletterQueue;
constructor(queues, store, locker, requestor, config, logger) {
this.queues = queues;
this.store = store;
this.locker = locker;
this.requestor = requestor;
@ -48,9 +46,7 @@ class Crawler {
_getRequest(requestBox, name) {
const self = this;
return Q()
.then(self.log(self._pop.bind(self, self.priorityQueue)))
.then(self.log(self._pop.bind(self, self.normalQueue)))
return this.log(this.queues.pop())
.then(request => {
if (!request) {
request = new Request('_blank', null);
@ -67,24 +63,13 @@ class Crawler {
.then(self.log(self._acquireLock.bind(self)));
}
_pop(queue, request = null) {
return Q.try(() => {
return request ? request : queue.pop();
}).then(result => {
if (result && !result.originQueue) {
result.originQueue = queue;
}
return result;
});
}
_acquireLock(request) {
if (!request.url || !this.locker) {
return Q(request);
}
const self = this;
return Q.try(() => {
return this.log(self.locker.lock(request.url, 5 * 60 * 1000), 'lock');
return this.log(self.locker.lock(request.url, 1 * 60 * 1000), 'lock');
}).then(
lock => {
request.lock = lock;
@ -94,8 +79,15 @@ class Crawler {
// If we could not acquire a lock, abandon the request so it will be returned to the queue.
// If that fails, throw the original error
return Q.try(() => {
this.log(request.originQueue.abandon(request), 'abandon');
}).finally(() => { throw error; });
this.log(this.queues.abandon(request), 'abandon');
}).finally(() => {
// don't throw if it is the normal Exceeded scenario. It is not an "error", just someone else is processing.
if (error.message.startsWith('Exceeded')) {
request.markRequeue('Requeued', `Could not lock ${request.url}`);
return request;
}
throw error;
});
});
}
@ -135,11 +127,11 @@ class Crawler {
request.attemptCount = request.attemptCount || 0;
if (++request.attemptCount > 5) {
this.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`);
this.queue(request, request, this.deadletterQueue);
request.track(this._queueDead(request, request));
} else {
request.addMeta({ attempt: request.attemptCount });
this.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`);
this.queue(request, request, request.originQueue);
request.track(this._requeueOrigin(request));
}
}
@ -149,6 +141,9 @@ class Crawler {
const delayGate = request.nextRequestTime || now;
const nextRequestTime = Math.max(requestGate, delayGate, now);
const delay = Math.max(0, nextRequestTime - now);
if (delay) {
this.logger.verbose(`Crawler: ${name} waiting for ${delay}ms`);
}
setTimeout(this.start.bind(this, name), delay);
}
@ -202,6 +197,10 @@ class Crawler {
return request.markSkip('Unmodified');
}
return self.store.get(fetchType, request.url).then(document => {
// if the doc had stored headers (e.g., page responses) then reconstitute them for processing
if (document._metadata.headers) {
Object.assign(githubResponse.headers, document._metadata.headers);
}
request.document = document;
request.response = githubResponse;
// Our store is up to date so don't store
@ -221,16 +220,21 @@ class Crawler {
return Q(request);
}
// If the doc is an array, wrap it in an object to make storage more consistent (Mongo can't store arrays directly)
// If the doc is an array,
// * wrap it in an object to make storage more consistent (Mongo can't store arrays directly)
// * save the link header as GitHub will not return those in a subsequent 304
let headers = {};
if (Array.isArray(request.document)) {
request.document = { elements: request.document };
headers = { link: request.response.headers.link };
}
request.document._metadata = {
type: request.type,
url: request.url,
etag: request.response.headers.etag,
fetchedAt: moment.utc().toISOString(),
links: {}
links: {},
headers: headers
};
return Q(request);
}
@ -256,7 +260,7 @@ class Crawler {
}
_deleteFromQueue(request) {
return request.originQueue.done(request).then(() => request);
return this.queues.done(request).then(() => { return request; });
}
_logOutcome(request) {
@ -266,7 +270,7 @@ class Crawler {
error.request = request;
this.logger.error(error);
} else {
request.addMeta({ total: Date.now() - request.start });
request.addMeta({ time: Date.now() - request.start });
this.logger.info(`${outcome} ${request.type} [${request.url}] ${request.message || ''}`, request.meta);
}
return request;
@ -274,22 +278,33 @@ class Crawler {
// =============== Helpers ============
queue(request, newRequest, queue = null) {
if (!this._shouldInclude(newRequest.type, newRequest.url)) {
this.logger.verbose(`Filtered ${newRequest.type} [${newRequest.url}]`);
return;
_requeueOrigin(request) {
const queuable = this._createQueuable(request);
return this.queues.repush(request, queuable);
}
_queueDead(request) {
const queuable = this._createQueuable(request);
return this.queues.pushDead(queuable);
}
queue(request, priority = false) {
if (!this._shouldInclude(request.type, request.url)) {
this.logger.verbose(`Filtered ${request.type} [${request.url}]`);
return [];
}
const queuable = this._createQueuable(request);
return priority ? this.queues.pushPriority(queuable) : this.queues.push(queuable);
}
_createQueuable(request) {
// Create a new request data structure that has just the things we should queue
const queuable = new Request(newRequest.type, newRequest.url);
queuable.attemptCount = newRequest.attemptCount;
queuable.context = newRequest.context;
queuable.force = newRequest.force;
queuable.subType = newRequest.subType;
queue = queue || this.normalQueue;
request.promises.push(queue.push(queuable));
return request;
const queuable = new Request(request.type, request.url);
queuable.attemptCount = request.attemptCount;
queuable.context = request.context;
queuable.force = request.force;
queuable.subType = request.subType;
return queuable;
}
_shouldInclude(type, target) {
@ -314,6 +329,7 @@ class Crawler {
// If we hit the low water mark for requests, proactively sleep until the next ratelimit reset
// This code is not designed to handle the 403 scenarios. That is handled by the retry logic.
const remaining = parseInt(response.headers['x-ratelimit-remaining']) || 0;
request.addMeta({ remaining: remaining });
const tokenLowerBound = this.config ? (this.config.tokenLowerBound || 50) : 50;
if (remaining < tokenLowerBound) {
const reset = parseInt(response.headers['x-ratelimit-reset']) || 0;

Просмотреть файл

@ -26,14 +26,18 @@ class Processor {
const linkHeader = request.response.headers.link;
if (linkHeader) {
const links = parse(linkHeader);
const requests = [];
for (let i = 2; i <= links.last.page; i++) {
const url = request.url + `?page=${i}&per_page=100`;
const newRequest = new Request('page', url);
newRequest.force = request.force;
newRequest.subType = request.subType;
newRequest.context = { qualifier: request.context.qualifier };
request.crawler.queue(request, newRequest, request.crawler.priorityQueue);
requests.push(newRequest);
}
// TODO this is a bit reachy. need a better way to efficiently queue up
// requests that we know are good.
request.track(request.crawler.queues.pushPriority(requests, true));
}
// Rewrite the request and document to be a 'page' and then process.

62
lib/queueSet.js Normal file
Просмотреть файл

@ -0,0 +1,62 @@
const Q = require('q');
const qlimit = require('qlimit');
class QueueSet {
constructor(priority, normal, deadletter) {
this.priority = priority;
this.normal = normal;
this.deadletter = deadletter;
this.allQueues = [priority,normal, deadletter];
}
pushPriority(requests) {
return this.push(requests, this.priority);
}
push(requests, queue = this.normal) {
return queue.push(requests);
}
pushDead(requests) {
return this.push(requests, this.deadletter);
}
repush(original, newRequest) {
return this.push(newRequest, original._originQueue);
}
subscribe() {
return Q.all(this.allQueues.map(queue => { return queue.subscribe(); }));
}
unsubscribe() {
return Q.all(this.allQueues.map(queue => { return queue.unsubscribe(); }));
}
pop() {
return Q()
.then(this._pop.bind(this, this.priority))
.then(this._pop.bind(this, this.normal));
}
_pop(queue, request = null) {
return Q.try(() => {
return request ? request : queue.pop();
}).then(result => {
if (result && !result._originQueue) {
result._originQueue = queue;
}
return result;
});
}
done(request) {
return request._originQueue ? request._originQueue.done(request) : Q();
}
abandon(request) {
return request._originQueue ? request._originQueue.abandon(request) : Q();
}
}
module.exports = QueueSet;

Просмотреть файл

@ -4,6 +4,18 @@ class Request {
constructor(type, url) {
this.type = type;
this.url = url;
this.promises = [];
}
track(promises) {
if (!promises) {
return;
}
if (Array.isArray(promises)) {
Array.prototype.push.apply(this.promises, promises);
} else {
this.promises.push(promises);
}
}
addMeta(data) {
@ -36,17 +48,17 @@ class Request {
links[name] = { type: 'siblings', href: href };
}
queue(type, url, context, queue = null) {
queue(type, url, context) {
const newRequest = new Request(type, url);
newRequest.context = context;
this.crawler.queue(this, newRequest, queue);
this.track(this.crawler.queue(newRequest));
}
queueRoot(type, url, force = false) {
const newRequest = new Request(type, url);
newRequest.context = { qualifier: 'urn:' };
newRequest.force = force;
this.crawler.queue(this, newRequest);
this.track(this.crawler.queue(newRequest));
}
queueRoots(type, url, force = false) {
@ -55,7 +67,7 @@ class Request {
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
newRequest.force = force;
this.crawler.queue(this, newRequest);
this.track(this.crawler.queue(newRequest));
}
queueChild(type, url, qualifier) {
@ -63,7 +75,7 @@ class Request {
newRequest.context = this.context || {};
newRequest.context.qualifier = qualifier;
newRequest.force = this.force;
this.crawler.queue(this, newRequest);
this.track(this.crawler.queue(newRequest));
}
queueChildren(type, url, context = null) {
@ -72,7 +84,7 @@ class Request {
newContext.qualifier = this.document._metadata.links.self.href;
newRequest.context = newContext;
newRequest.force = this.force;
this.crawler.queue(this, newRequest);
this.track(this.crawler.queue(newRequest));
}
markSkip(outcome, message) {

Просмотреть файл

@ -4,6 +4,7 @@ const Crawler = require('../lib/crawler');
const expect = require('chai').expect;
const extend = require('extend');
const Q = require('q');
const QueueSet = require('../lib/queueSet');
const Request = require('../lib/request');
const sinon = require('sinon');
@ -11,12 +12,13 @@ describe('Crawler get request', () => {
it('should get from the priority queue first', () => {
const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('priority');
expect(request.originQueue === priority).to.be.true;
expect(request._originQueue === queues.priority).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
@ -26,12 +28,13 @@ describe('Crawler get request', () => {
it('should get from the normal queue if no priority', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { return Q(new Request('normal', 'http://test')); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q('locked'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('normal');
expect(request.originQueue === normal).to.be.true;
expect(request._originQueue === queues.normal).to.be.true;
expect(request.lock).to.be.equal('locked');
expect(request.crawlerName).to.be.equal('test');
expect(request).to.be.equal(requestBox[0]);
@ -41,7 +44,8 @@ describe('Crawler get request', () => {
it('should return a dummy skip/delay request if none are queued', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(request => {
expect(request.type).to.be.equal('_blank');
@ -56,7 +60,8 @@ describe('Crawler get request', () => {
it('should throw when normal pop errors', () => {
const priority = createBaseQueue({ pop: () => { return Q(null); } });
const normal = createBaseQueue({ pop: () => { throw new Error('normal test'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
@ -67,7 +72,8 @@ describe('Crawler get request', () => {
it('should throw when priority pop errors', () => {
const priority = createBaseQueue({ pop: () => { throw new Error('priority test'); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority });
const queues = createBaseQueues({ priority: priority, normal: normal });
const crawler = createBaseCrawler({ queues: queues });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
@ -78,8 +84,9 @@ describe('Crawler get request', () => {
it('should throw when acquire lock errors', () => {
const priority = createBaseQueue({ pop: () => { return Q(new Request('priority', 'http://test')); } });
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { throw new Error('locker error'); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
@ -97,8 +104,9 @@ describe('Crawler get request', () => {
}
});
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
@ -115,8 +123,9 @@ describe('Crawler get request', () => {
abandon: request => { throw new Error('Abandon error'); }
});
const normal = createBaseQueue({ pop: () => { return Q(null); } });
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker({ lock: () => { return Q.reject(new Error('locker error')); } });
const crawler = createBaseCrawler({ normal: normal, priority: priority, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
const requestBox = [];
return crawler._getRequest(requestBox, 'test').then(
request => assert.fail(),
@ -447,10 +456,10 @@ describe('Crawler queue', () => {
const config = { orgFilter: new Set(['test']) };
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const newRequest = new Request('repo', 'http://api.github.com/repo/microsoft/test');
request = { promises: [] };
const crawler = createBaseCrawler({ normal: normal, options: config });
crawler.queue(request, newRequest);
const queues = createBaseQueues({ normal: normal });
const request = new Request('repo', 'http://api.github.com/repo/microsoft/test');
const crawler = createBaseCrawler({ queues: queues, options: config });
crawler.queue(request);
expect(request.promises.length).to.be.equal(0);
expect(queue.length).to.be.equal(0);
});
@ -459,30 +468,31 @@ describe('Crawler queue', () => {
const config = { orgFilter: new Set(['microsoft']) };
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const newRequest = new Request('repo', 'http://api.github.com/repo/microsoft/test');
request = { promises: [] };
const crawler = createBaseCrawler({ normal: normal, options: config });
crawler.queue(request, newRequest);
const queues = createBaseQueues({ normal: normal });
const request = new Request('repo', 'http://api.github.com/repo/microsoft/test');
const crawler = createBaseCrawler({ queues: queues, options: config });
request.track(crawler.queue(request));
expect(request.promises.length).to.be.equal(1);
expect(queue.length).to.be.equal(1);
expect(queue[0] !== newRequest).to.be.true;
expect(queue[0].type === newRequest.type).to.be.true;
expect(queue[0].url === newRequest.url).to.be.true;
expect(queue[0] !== request).to.be.true;
expect(queue[0].type === request.type).to.be.true;
expect(queue[0].url === request.url).to.be.true;
});
// TODO
it('should queue in supplied queue', () => {
const config = { orgFilter: new Set(['microsoft']) };
const queue = [];
const supplied = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const newRequest = new Request('repo', 'http://api.github.com/repo/microsoft/test');
request = { promises: [] };
const crawler = createBaseCrawler({ options: config });
crawler.queue(request, newRequest, supplied);
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const request = new Request('repo', 'http://api.github.com/repo/microsoft/test');
const crawler = createBaseCrawler({ queues: queues, options: config });
request.track(crawler.queue(request));
expect(request.promises.length).to.be.equal(1);
expect(queue.length).to.be.equal(1);
expect(queue[0] !== newRequest).to.be.true;
expect(queue[0].type === newRequest.type).to.be.true;
expect(queue[0].url === newRequest.url).to.be.true;
expect(queue[0] !== request).to.be.true;
expect(queue[0].type === request.type).to.be.true;
expect(queue[0].url === request.url).to.be.true;
});
});
@ -497,12 +507,12 @@ describe('Crawler requeue', () => {
it('should requeue in same queue as before', () => {
const queue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const crawler = createBaseCrawler({ normal: normal });
const queues = createBaseQueues({ normal: normal });
const crawler = createBaseCrawler({ queues: queues });
for (let i = 0; i < 5; i++) {
const request = new Request('test', 'http://api.github.com/repo/microsoft/test');
request.markRequeue();
request.promises = [];
request.originQueue = normal;
request._originQueue = normal;
request.attemptCount = i === 0 ? null : i;
crawler._requeue(request);
expect(request.promises.length).to.be.equal(1);
@ -518,23 +528,23 @@ describe('Crawler requeue', () => {
it('should requeue in deadletter queue after 5 attempts', () => {
const queue = [];
const deadLetterQueue = [];
const deadletterQueue = [];
const normal = createBaseQueue({ push: request => { queue.push(request); return Q(); } });
const deadLetter = createBaseQueue({ push: request => { deadLetterQueue.push(request); return Q(); } });
const deadletter = createBaseQueue({ push: request => { deadletterQueue.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal, deadletter: deadletter });
const request = new Request('test', 'http://api.github.com/repo/microsoft/test');
request.promises = [];
request.attemptCount = 5;
request.markRequeue();
request.originQueue = normal;
const crawler = createBaseCrawler({ normal: normal, deadLetter: deadLetter });
request._originQueue = normal;
const crawler = createBaseCrawler({ queues: queues });
crawler._requeue(request);
expect(request.promises.length).to.be.equal(1);
expect(queue.length).to.be.equal(0);
expect(deadLetterQueue.length).to.be.equal(1);
expect(deadLetterQueue[0] !== request).to.be.true;
expect(deadLetterQueue[0].type === request.type).to.be.true;
expect(deadLetterQueue[0].url === request.url).to.be.true;
expect(deadLetterQueue[0].attemptCount).to.be.equal(6);
expect(deadletterQueue.length).to.be.equal(1);
expect(deadletterQueue[0] !== request).to.be.true;
expect(deadletterQueue[0].type === request.type).to.be.true;
expect(deadletterQueue[0].url === request.url).to.be.true;
expect(deadletterQueue[0].attemptCount).to.be.equal(6);
});
});
@ -543,12 +553,12 @@ describe('Crawler complete request', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest.promises = [];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
originalRequest._originQueue = normal;
const crawler = createBaseCrawler({ queues: queues, locker: locker });
return crawler._completeRequest(originalRequest).then(request => {
expect(request === originalRequest).to.be.true;
expect(request.lock).to.be.null;
@ -567,13 +577,13 @@ describe('Crawler complete request', () => {
push: request => { queue.push(request); return Q(); },
done: request => { done.push(request); return Q(); }
});
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
originalRequest.markRequeue();
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest.promises = [];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
originalRequest._originQueue = normal;
const crawler = createBaseCrawler({ queues: queues, locker: locker });
return crawler._completeRequest(originalRequest).then(request => {
expect(request === originalRequest).to.be.true;
expect(request.lock).to.be.null;
@ -592,12 +602,12 @@ describe('Crawler complete request', () => {
it('should do all right things for requests with no url', () => {
const done = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const originalRequest = new Request('test', null);
originalRequest.markRequeue();
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest.promises = [];
const crawler = createBaseCrawler({ normal: normal });
originalRequest._originQueue = normal;
const crawler = createBaseCrawler({ queues: queues });
return crawler._completeRequest(originalRequest).then(request => {
expect(request === originalRequest).to.be.true;
expect(done.length).to.be.equal(1);
@ -616,6 +626,7 @@ describe('Crawler complete request', () => {
return Q();
}
});
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({
unlock: request => {
if (!promiseValue[0]) assert.fail();
@ -625,9 +636,9 @@ describe('Crawler complete request', () => {
});
const originalRequest = new Request('test', 'http://test.com');
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest._originQueue = normal;
originalRequest.promises = [Q.delay(1).then(() => promiseValue[0] = 13)];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
return crawler._completeRequest(originalRequest).then(
request => {
expect(request === originalRequest).to.be.true;
@ -645,12 +656,13 @@ describe('Crawler complete request', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest._originQueue = normal;
originalRequest.promises = [Q.reject(13)];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
const crawler = createBaseCrawler({ queues: queues, locker: locker });
return crawler._completeRequest(originalRequest).then(
request => assert.fail(),
error => {
@ -665,12 +677,12 @@ describe('Crawler complete request', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: request => { done.push(request); return Q(); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: () => { throw new Error('sigh'); } });
const originalRequest = new Request('test', 'http://test.com');
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest.promises = [];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
originalRequest._originQueue = normal;
const crawler = createBaseCrawler({ queues: queues, locker: locker });
return crawler._completeRequest(originalRequest).then(
request => {
expect(request === originalRequest).to.be.true;
@ -686,12 +698,12 @@ describe('Crawler complete request', () => {
const done = [];
const unlock = [];
const normal = createBaseQueue({ done: () => { throw new Error('sigh'); } });
const queues = createBaseQueues({ normal: normal });
const locker = createBaseLocker({ unlock: request => { unlock.push(request); return Q(); } });
const originalRequest = new Request('test', 'http://test.com');
originalRequest.lock = 42;
originalRequest.originQueue = normal;
originalRequest.promises = [];
const crawler = createBaseCrawler({ normal: normal, locker: locker });
originalRequest._originQueue = normal;
const crawler = createBaseCrawler({ queues: queues, locker: locker });
return crawler._completeRequest(originalRequest).then(
request => assert.fail(),
error => {
@ -890,13 +902,13 @@ describe('Crawler whole meal deal', () => {
const crawler = createFullCrawler();
sinon.stub(crawler, '_startNext', () => Q());
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1, 'priority call count');
expect(crawler.normalQueue.pop.callCount).to.be.equal(1, 'normal call count');
expect(crawler.queues.priority.pop.callCount).to.be.equal(1, 'priority call count');
expect(crawler.queues.normal.pop.callCount).to.be.equal(1, 'normal call count');
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1, 'lock call count');
@ -925,13 +937,14 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
});
});
it('should empty request queues', () => {
// TODO
});
it('should handle getRequest reject', () => {
@ -943,14 +956,14 @@ describe('Crawler whole meal deal', () => {
sinon.stub(normal, 'pop', () => { throw Error('cant pop') });
sinon.stub(normal, 'push', request => { return Q(); });
sinon.spy(normal, 'done');
crawler.normalQueue = normal;
crawler.queues.normal = normal;
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1);
expect(crawler.normalQueue.pop.callCount).to.be.equal(1);
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(0);
@ -961,7 +974,7 @@ describe('Crawler whole meal deal', () => {
const requestorGet = crawler.requestor.get;
expect(requestorGet.callCount).to.be.equal(0);
const push = crawler.normalQueue.push;
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(0);
const upsert = crawler.store.upsert;
@ -970,7 +983,7 @@ describe('Crawler whole meal deal', () => {
const unlock = crawler.locker.unlock;
expect(unlock.callCount).to.be.equal(0);
expect(crawler.normalQueue.done.callCount).to.be.equal(0);
expect(crawler.queues.normal.done.callCount).to.be.equal(0);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
@ -983,13 +996,13 @@ describe('Crawler whole meal deal', () => {
sinon.stub(crawler, '_startNext', () => Q());
// setup a good request but a server error response
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse(null, 500)];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1);
expect(crawler.normalQueue.pop.callCount).to.be.equal(1);
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
@ -1004,7 +1017,7 @@ describe('Crawler whole meal deal', () => {
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.normalQueue.push;
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1017,7 +1030,7 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
@ -1030,13 +1043,13 @@ describe('Crawler whole meal deal', () => {
sinon.stub(crawler, '_startNext', () => Q());
crawler.processor = { process: () => { throw new Error('bad processor') } };
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
expect(crawler.priorityQueue.pop.callCount).to.be.equal(1);
expect(crawler.normalQueue.pop.callCount).to.be.equal(1);
expect(crawler.queues.priority.pop.callCount).to.be.equal(1);
expect(crawler.queues.normal.pop.callCount).to.be.equal(1);
const lock = crawler.locker.lock;
expect(lock.callCount).to.be.equal(1);
@ -1051,7 +1064,7 @@ describe('Crawler whole meal deal', () => {
expect(requestorGet.callCount).to.be.equal(1);
expect(requestorGet.getCall(0).args[0]).to.be.equal('http://test.com/users/user1');
const push = crawler.normalQueue.push;
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1064,7 +1077,7 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
expect(crawler.logger.error.callCount).to.be.equal(1);
const error = crawler.logger.error.getCall(0).args[0];
@ -1077,7 +1090,7 @@ describe('Crawler whole meal deal', () => {
sinon.stub(crawler, '_startNext', () => Q());
crawler.store = { upsert: () => { throw new Error('bad upsert') } };
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
@ -1086,9 +1099,9 @@ describe('Crawler whole meal deal', () => {
expect(unlock.callCount).to.be.equal(1);
expect(unlock.getCall(0).args[0]).to.be.equal('lockToken');
expect(crawler.normalQueue.done.callCount).to.be.equal(1);
expect(crawler.queues.normal.done.callCount).to.be.equal(1);
const push = crawler.normalQueue.push;
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1105,12 +1118,12 @@ describe('Crawler whole meal deal', () => {
sinon.stub(crawler, '_startNext', () => Q());
crawler.locker = { unlock: () => { throw new Error('bad unlock') } };
crawler.normalQueue.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.queues.normal.requests = [new Request('user', 'http://test.com/users/user1')];
crawler.requestor.responses = [createResponse({ id: 42, repos_url: 'http://test.com/users/user1/repos' })];
return Q.try(() => {
return crawler.start('test');
}).then(() => {
const push = crawler.normalQueue.push;
const push = crawler.queues.normal.push;
expect(push.callCount).to.be.equal(1);
const newRequest = push.getCall(0).args[0];
expect(newRequest.type).to.be.equal('user');
@ -1134,6 +1147,8 @@ function createFullCrawler() {
sinon.stub(normal, 'push', request => { return Q(); });
sinon.spy(normal, 'done');
const queues = createBaseQueues({ priority: priority, normal: normal });
const locker = createBaseLocker();
sinon.stub(locker, 'lock', request => { return Q('lockToken'); });
sinon.stub(locker, 'unlock', request => { return Q(); });
@ -1158,7 +1173,7 @@ function createFullCrawler() {
const config = [];
const result = createBaseCrawler({ normal: normal, priority: priority, requestor: requestor, store: store, logger: logger, locker: locker, options: config });
const result = createBaseCrawler({ queues: queues, requestor: requestor, store: store, logger: logger, locker: locker, options: config });
result.processor = processor;
return result;
}
@ -1211,8 +1226,12 @@ function createLinkHeader(target, previous, next, last) {
return [firstLink, prevLink, nextLink, lastLink].filter(value => { return value !== null; }).join(',');
}
function createBaseCrawler({normal = createBaseQueue(), priority = createBaseQueue(), deadLetter = createBaseQueue(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = { promiseTrace: false }, logger = createBaseLog() } = {}) {
return new Crawler(normal, priority, deadLetter, store, locker, requestor, options, logger);
function createBaseCrawler({queues = createBaseQueues(), store = createBaseStore(), locker = createBaseLocker, requestor = createBaseRequestor(), options = { promiseTrace: false }, logger = createBaseLog() } = {}) {
return new Crawler(queues, store, locker, requestor, options, logger);
}
function createBaseQueues({ priority = null, normal = null, deadletter = null} = {}) {
return new QueueSet(priority || createBaseQueue(), normal || createBaseQueue(), deadletter || createBaseQueue());
}
function createBaseQueue({ pop = null, push = null, done = null, abandon = null} = {}) {