diff --git a/.vscode/launch.json b/.vscode/launch.json index 34c6414..4b7d830 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -127,7 +127,7 @@ "CRAWLER_MODE": "Standard", "CRAWLER_EVENT_PROVIDER": "none", "CRAWLER_OPTIONS_PROVIDER": "redis", - "DEBUG.off": "amqp10:client,amqp10:link:receiver" + "NODE_TLS_REJECT_UNAUTHORIZED": "0" }, "console": "internalConsole", "sourceMaps": false, diff --git a/lib/crawler.js b/lib/crawler.js index a8e3409..0e8dd90 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -234,7 +234,7 @@ class Crawler { return self._requeue(request); }) .catch(error => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): catch force requeue`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): catch force requeue`); self.logger.error(error); throw error; }) @@ -247,7 +247,7 @@ class Crawler { return self._abandonInQueue(request); }) .then(() => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): exit (success - force requeue)`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): exit (success - force requeue)`); return request; }); } @@ -259,39 +259,39 @@ class Crawler { const loggingPromise = originalPromise.then(result => { completedPromises++; - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): completed ${completedPromises} of ${trackedPromises.length} promises (${failedPromises} failed)`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): completed ${completedPromises} of ${trackedPromises.length} promises (${failedPromises} failed)`); return result; }, error => { completedPromises++; failedPromises++; - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): completed ${completedPromises} of ${trackedPromises.length} promises (${failedPromises} failed)`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): completed ${completedPromises} of ${trackedPromises.length} promises (${failedPromises} failed)`); throw error; }); } - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): ${trackedPromises.length} tracked promises`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): ${trackedPromises.length} tracked promises`); const completeWork = Q.all(trackedPromises).then( () => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): resolved tracked promises`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): resolved tracked promises`); return self._releaseLock(request).then( () => { return self._deleteFromQueue(request); }, error => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): catch release lock`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): catch release lock`); self.logger.error(error); return self._abandonInQueue(request); }); }, error => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): catch tracked promises`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): catch tracked promises`); self.logger.error(error); return self._completeRequest(request, true); }); return completeWork.then(() => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): exit (success)`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): exit (success)`); return request; }).catch(error => { - debug(`_completeRequest(${request.meta.loopName}:${request.toUniqueString()}): catch completeWork`); + debug(`_completeRequest(${loopName}:${request.toUniqueString()}): catch completeWork`); throw error; }); } diff --git a/lib/request.js b/lib/request.js index 84208b5..b89b4ff 100644 --- a/lib/request.js +++ b/lib/request.js @@ -42,6 +42,8 @@ class Request { this.start = Date.now(); this.context = this.context || {}; this._addHistory(); + const root = this.context.history.length <= 1 ? 'self' : this.context.history[0]; + this.addMeta({ root: root }); this._resolvePolicy(); return this; } @@ -61,9 +63,9 @@ class Request { } } - _addHistory() { + _addHistory(request = null) { this.context.history = this.context.history || []; - this.context.history.push(this.toString()); + this.context.history.push((request || this).toString()); } hasSeen(request) { @@ -246,7 +248,7 @@ class Request { } toString() { - return `${this.type}@${this.url}`; + return `${this.type}@${this._trimUrl(this.url)}`; } toUniqueString() { @@ -254,6 +256,10 @@ class Request { return `${this.type}@${this.url}:${policyName}`; } + _trimUrl(url) { + return url ? url.replace('https://api.github.com', '') : ''; + } + _log(level, message, meta = null) { if (this.crawler) { this.crawler.logger.log(level, message, meta); diff --git a/providers/fetcher/githubProcessor.js b/providers/fetcher/githubProcessor.js index ea03b9a..049ed5f 100644 --- a/providers/fetcher/githubProcessor.js +++ b/providers/fetcher/githubProcessor.js @@ -1,11 +1,12 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. +const extend = require('extend'); const moment = require('moment'); const parse = require('parse-link-header'); -const Request = require('../../lib/request'); const Q = require('q'); const qlimit = require('qlimit'); +const Request = require('../../lib/request'); const TraversalPolicy = require('../../lib/traversalPolicy'); const URL = require('url'); const uuid = require('node-uuid'); @@ -63,7 +64,8 @@ class GitHubProcessor { for (let i = 2; i <= links.last.page; i++) { const separator = request.url.includes('?') ? '&' : '?'; const url = request.url + `${separator}page=${i}&per_page=100`; - const newRequest = new Request(request.type, url, request.context); + const newContext = extend(true, {}, request.context); + const newRequest = new Request(request.type, url, newContext); // Carry this request's transitivity forward to the other pages. newRequest.policy = request.policy; requests.push(newRequest); @@ -108,7 +110,7 @@ class GitHubProcessor { // TODO if there is no elementType on a collection then assume it is events. Need to fix this up and // formalize the model of collections where the request carries the payload. const baseUrl = request.url.split("?")[0]; - const newContext = { history: request.context.history }; + const newContext = extend(true, {}, { history: request.context.history }); const newRequest = new Request(item.type, `${baseUrl}/${item.id}`, newContext); newRequest.payload = { etag: 1, body: item }; newRequest.policy = request.policy; @@ -440,7 +442,8 @@ class GitHubProcessor { // is in the payload), it will need to be unique for the queue tagging/optimization // Events are immutable (and we can't fetch them later) so set the etag to a constant const baseUrl = request.url.split("?")[0]; - const newRequest = new Request(event.type, `${baseUrl}/${event.id}`); + const newContext = extend(true, {}, { history: request.context.history }); + const newRequest = new Request(event.type, `${baseUrl}/${event.id}`, newContext); newRequest.policy = TraversalPolicy.event(event.type); newRequest.payload = { etag: 1, body: event }; return newRequest; @@ -736,7 +739,8 @@ class GitHubProcessor { qualifier = qualifier || (repo ? `urn:repo:${repo}` : 'urn:'); const separator = qualifier.endsWith(':') ? '' : ':'; request.linkResource(name, `${qualifier}${separator}${type}:${payload[name].id}`); - const newRequest = new Request(type, payload[name].url, { qualifier: qualifier }); + const newContext = extend(true, {}, { history: request.context.history, qualifier: qualifier }); + const newRequest = new Request(type, payload[name].url, newContext); newRequest.policy = request.getNextPolicy(name); if (newRequest.policy) { request.queueRequests(newRequest);