diff --git a/lib/crawler.js b/lib/crawler.js index 5e3ddd0..c021651 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -92,21 +92,28 @@ class Crawler { } _completeRequest(request) { + // requeue the request if needed then wait for any accumulated promises and release the lock and clean up the queue + this._requeue(request); const self = this; - return this._releaseLock(request).finally(() => { - if (request.shouldRequeue()) { - request.attemptCount = request.attemptCount || 1; - if (++request.attemptCount > 5) { - self.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`); - self.queue(request, request, self.deadletterQueue); - } else { - request.addMeta({ attempt: request.attemptCount }); - self.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`); - self.queue(request, request, request.originQueue); - } - } - return self._deleteFromQueue(request); - }); + return Q.all(request.promises) + .finally(() => self._releaseLock(request)) + .finally(() => self._deleteFromQueue(request)) + .then(() => { return request; }); + } + + _requeue(request) { + if (!request.shouldRequeue()) { + return; + } + request.attemptCount = request.attemptCount || 1; + if (++request.attemptCount > 5) { + self.logger.warn(`Exceeded attempt count for ${request.type} ${request.url}`); + self.queue(request, request, self.deadletterQueue); + } else { + request.addMeta({ attempt: request.attemptCount }); + self.logger.verbose(`Requeuing attempt ${request.attemptCount} of request ${request.type} for ${request.url}`); + self.queue(request, request, request.originQueue); + } } _pop(queue, request = null) {