This commit is contained in:
William Bartholomew 2016-11-11 16:46:31 -08:00
Родитель 3e50460fd0
Коммит 5ed3e0d868
3 изменённых файлов: 52 добавлений и 25 удалений

Просмотреть файл

@ -5,11 +5,12 @@ const Q = require('q');
const URL = require('url');
class Crawler {
constructor(queue, priorityQueue, deadletterQueue, store, requestor, config, logger) {
constructor(queue, priorityQueue, deadletterQueue, store, locker, requestor, config, logger) {
this.normalQueue = queue;
this.priorityQueue = priorityQueue;
this.deadletterQueue = deadletterQueue;
this.store = store;
this.locker = locker;
this.requestor = requestor;
this.config = config;
this.logger = logger;
@ -26,10 +27,10 @@ class Crawler {
.then(this._processDocument.bind(this))
.then(this._storeDocument.bind(this))
.catch(this._errorHandler.bind(this, requestBox))
.then(this._completeRequest.bind(this))
.then(this._completeRequest.bind(this), this._completeRequest.bind(this))
.catch(this._errorHandler.bind(this, requestBox))
.then(this._logOutcome.bind(this))
.finally(this._startNext.bind(this));
.then(this._startNext.bind(this), this._startNext.bind(this));
}
_errorHandler(requestBox, error) {
@ -54,22 +55,51 @@ class Crawler {
request.start = Date.now();
requestBox[0] = request;
return request;
})
.then(this._acquireLock.bind(this));
}
_acquireLock(request) {
if (request.url) {
return this.locker.lock(request.url, 5 * 60 * 1000).then((lock) => {
request.lock = lock;
return request;
}, error => {
return request.originQueue.abandon(request).finally(() => {
throw error;
});
});
}
}
_releaseLock(request) {
if (request.lock) {
return this.locker.unlock(request.lock)
.then(() => {
return request;
}, error => {
this.logger.error(error);
return request;
});
}
return Q(request);
}
_completeRequest(request) {
if (request.shouldRequeue()) {
request.attemptCount = request.attemptCount || 1;
if (++request.attemptCount > 5) {
this.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`);
this.queue(request, request, this.deadletterQueue);
} else {
request.addMeta({ attempt: request.attemptCount });
this.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`);
this.queue(request, request, request.originQueue);
return this._releaseLock(request).finally(() => {
if (request.shouldRequeue()) {
request.attemptCount = request.attemptCount || 1;
if (++request.attemptCount > 5) {
this.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`);
this.queue(request, request, this.deadletterQueue);
} else {
request.addMeta({ attempt: request.attemptCount });
this.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`);
this.queue(request, request, request.originQueue);
}
}
}
return this._deleteFromQueue(request);
return this._deleteFromQueue(request);
})
}
_pop(queue, request = null) {
@ -84,7 +114,6 @@ class Crawler {
}
_startNext(request) {
this.requestBox[0] = null;
const delay = request.shouldDelay() ? 1000 : 0;
setTimeout(this.start.bind(this), delay);
}

Просмотреть файл

@ -13,7 +13,7 @@ class Processor {
const links = parse(linkHeader);
for (let i = 2; i <= links.last.page; i++) {
const url = request.url + `?page=${i}&per_page=100`;
const newRequest = Request.create('page', url);
const newRequest = new Request('page', url);
newRequest.force = request.force;
newRequest.page = i;
newRequest.subType = request.subType;

Просмотреть файл

@ -1,11 +1,9 @@
const extend = require('extend');
class Request {
static create(type, url) {
const result = new Request();
result.type = type;
result.url = url;
return result;
constructor(type, url) {
this.type = type;
this.url = url;
}
addMeta(data) {
@ -32,17 +30,17 @@ class Request {
}
queue(type, url, context, queue = null) {
const newRequest = Request.create(type, url);
const newRequest = new Request(type, url);
newRequest.context = context;
this.crawler.queue(this, newRequest, queue);
}
queueRoot(type, url) {
this.crawler.queue(this, Request.create(type, url));
this.crawler.queue(this, new Request(type, url));
}
queueChild(type, url, qualifier) {
const newRequest = Request.create(type, url);
const newRequest = new Request(type, url);
newRequest.context = this.context || {};
newRequest.context.qualifier = qualifier;
if (this.force) {
@ -52,7 +50,7 @@ class Request {
}
queueChildren(type, url, context = null) {
const newRequest = Request.create(type, url);
const newRequest = new Request(type, url);
const newContext = extend(this.context || {}, context);
newRequest.context = newContext;
newContext.qualifier = this.document._metadata.links.self.href;