This commit is contained in:
William Bartholomew 2016-11-15 14:11:58 -08:00
Родитель 7e26cfe971
Коммит dbe9ac4e13
1 изменённых файлов: 47 добавлений и 32 удалений

Просмотреть файл

@ -18,21 +18,37 @@ class Crawler {
this.processor = new Processor();
}
log(fn) {
return Q.try(() => {
this.logger.verbose(`Enter: ${fn.name}`);
return fn();
}).then(
result => {
this.logger.verbose(`Success: ${fn.name}`);
return result;
},
error => {
this.logger.error(`Error: ${fn.name}`, error);
return Q.reject(error);
}
);
}
start(name) {
let requestBox = [];
return Q()
.then(this._getRequest.bind(this, requestBox, name))
.then(this._filter.bind(this))
.then(this._fetch.bind(this))
.then(this._convertToDocument.bind(this))
.then(this._processDocument.bind(this))
.then(this._storeDocument.bind(this))
.catch(this._errorHandler.bind(this, requestBox))
.then(this._completeRequest.bind(this), this._completeRequest.bind(this))
.catch(this._errorHandler.bind(this, requestBox))
.then(this._logOutcome.bind(this))
.then(this._startNext.bind(this, name), this._startNext.bind(this, name));
.then(this.log(this._getRequest.bind(this, requestBox, name)))
.then(this.log(this._filter.bind(this)))
.then(this.log(this._fetch.bind(this)))
.then(this.log(this._convertToDocument.bind(this)))
.then(this.log(this._processDocument.bind(this)))
.then(this.log(this._storeDocument.bind(this)))
.catch(this.log(this._errorHandler.bind(this, requestBox)))
.then(this.log(this._completeRequest.bind(this), this._completeRequest.bind(this)))
.catch(this.log(this._errorHandler.bind(this, requestBox)))
.then(this.log(this._logOutcome.bind(this)))
.then(this.log(this._startNext.bind(this, name), this._startNext.bind(this, name)));
}
_errorHandler(requestBox, error) {
@ -48,24 +64,23 @@ class Crawler {
_getRequest(requestBox, name) {
const self = this;
return Q.try(() => {
return this._pop(this.priorityQueue)
.then(this._pop.bind(this, this.normalQueue))
.then(request => {
if (!request) {
request = new Request('_blank', null);
request.markDelay();
request.markSkip('Exhausted queue', `Waiting 1 second`);
}
request.start = Date.now();
request.crawler = self;
request.crawlerName = name;
requestBox[0] = request;
request.promises = [];
return request;
});
})
.then(this._acquireLock.bind(this));
return Q()
.then(self.log(self._pop.bind(self, self.priorityQueue)))
.then(self.log(self._pop.bind(self, self.normalQueue)))
.then(request => {
if (!request) {
request = new Request('_blank', null);
request.markDelay();
request.markSkip('Exhausted queue', `Waiting 1 second`);
}
request.start = Date.now();
request.crawler = self;
request.crawlerName = name;
requestBox[0] = request;
request.promises = [];
return request;
})
.then(self.log(self._acquireLock.bind(self)));
}
_pop(queue, request = null) {
@ -85,7 +100,7 @@ class Crawler {
}
const self = this;
return Q.try(() => {
return self.locker.lock(request.url, 5 * 60 * 1000);
return this.log(self.locker.lock(request.url, 5 * 60 * 1000));
}).then(
lock => {
request.lock = lock;
@ -95,7 +110,7 @@ class Crawler {
// If we could not acquire a lock, abandon the request so it will be returned to the queue.
// If that fails, throw the original error
return Q.try(() => {
request.originQueue.abandon(request);
this.log(request.originQueue.abandon(request));
}).finally(() => { throw error; });
});
}
@ -244,7 +259,7 @@ class Crawler {
}
_deleteFromQueue(request) {
return request.originQueue.done(request).then(() => { return request });
return request.originQueue.done(request).then(() => request);
}
_logOutcome(request) {