diff --git a/.vscode/launch.json b/.vscode/launch.json index c2f9b14..78588fb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,6 +1,25 @@ { "version": "0.2.0", "configurations": [ + { + "name": "Run mocha", + "type": "node", + "request": "launch", + "program": "${workspaceRoot}/node_modules/mocha/bin/_mocha", + "stopOnEntry": false, + "args": [ + "test/eventFinderTests.js" + ], + "cwd": "${workspaceRoot}", + "runtimeExecutable": null, + "runtimeArgs": [ + "--nolazy" + ], + "env": { + "NODE_ENV": "development" + }, + "console": "internalConsole" + }, { "name": "Attach", "type": "node", diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 670bd97..0000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,6 +0,0 @@ -// Place your settings in this file to overwrite default and user settings. -{ - "jshint.options": { - "esnext": true - } -} \ No newline at end of file diff --git a/index.js b/index.js new file mode 100644 index 0000000..23929ce --- /dev/null +++ b/index.js @@ -0,0 +1,4 @@ +module.exports.serviceBusQueue = require('./lib/serviceBusQueue'); +module.exports.eventFinder = require('./lib/eventFinder'); +module.exports.webhookReceiver = require('./lib/WebhookDriver'); +module.exports.crawler = require('./lib/crawler'); diff --git a/lib/crawler.js b/lib/crawler.js index 1424dbe..8180884 100644 --- a/lib/crawler.js +++ b/lib/crawler.js @@ -7,7 +7,6 @@ class Crawler { this.queue = queue; this.store = store; this.requestor = requestor; - this.config = config; this.logger = logger; } @@ -43,7 +42,7 @@ class Crawler { return Q.resolve(request); } - return this._getRequestor().getAll(request.url) + this.requestor.getAll(crawlRequest.url, this._defaultOptions()).then( .then(githubResponse => { request.response = githubResponse; return request; @@ -77,6 +76,10 @@ class Crawler { } let document = null; switch (request.type) { + case 'event': { + document = self._processEvent(); + + } case 'orgs': { document = this._processCollection(request.response, 'login', request.context); break; @@ -156,9 +159,7 @@ class Crawler { } return this.queue.done(request).then(() => { return request; }); - } - - _markSeen(request) { + return { // TODO retryable vs non-retryable and re-queue this.seen[request.url] = true; return Q.resolve(request); @@ -166,10 +167,7 @@ class Crawler { _getRequestor() { return new this.requestor({ - headers: { - authorization: this.config.githubToken - } - }); + }; } _processCollection(document, type, context) { diff --git a/lib/eventFinder.js b/lib/eventFinder.js new file mode 100644 index 0000000..26536f4 --- /dev/null +++ b/lib/eventFinder.js @@ -0,0 +1,44 @@ +const Q = require('q'); +const async = require('async'); + +class EventFinder { + constructor(requestor, eventStore) { + this.requestor = requestor; + this.eventStore = eventStore; + } + + discoverAndQueue(eventSource, eventSink) { + if (!eventSource) { + return Q(null); + } + return this.getNewEvents(eventSource).then(events => { + self._queueEvents(events, eventSink); + }); + } + + _queueEvents(events, eventSink) { + events.forEach(event => { + eventSink.push({ type: 'event', url: event.url }); + }); + } + + getNewEvents(eventSource) { + const self = this; + return this.requestor.getAll(eventSource).then(self._findNew.bind(self)); + } + + _findNew(events, callback = null) { + const deferred = Q.defer(); + const realCallback = callback || ((err, value) => { + if (err) + deferred.reject(err); + else + deferred.resolve(value); + }); + async.filterLimit(events, 1, (event, cb) => { + this.eventStore.etag('event', event.url, (err, tag) => { cb(err, !tag); }); + }, realCallback); + return callback ? null : deferred.promise; + } +} +module.exports = EventFinder; \ No newline at end of file diff --git a/lib/serviceBusQueue.js b/lib/serviceBusQueue.js new file mode 100644 index 0000000..9551f50 --- /dev/null +++ b/lib/serviceBusQueue.js @@ -0,0 +1,27 @@ +const azure = require('azure'); + +const topicName = 'webhookevents'; +const subscriptionName = 'ghcrawlerdev'; +const notSoSmartTimeoutMilliseconds = 1000; + +class ServiceBusQueue { + + constructor(connectionSpec) { + this.bus = azure.createServiceBusService(connectionSpec); + } + + pop(handler) { + this.bus.receiveSubscriptionMessage(topicName, subscriptionName, { isPeekLock: true }, (peekError, message) => { + if (!message) { + // No messages found. Let's chill out for a little bit. + // Could use a smart retry system here based on how many chillout moments we have had lately. + return setTimeout(completion, notSoSmartTimeoutMilliseconds); + } + if (peekError) { + return; + } + return handler(message); + }); + } +} +module.exports = ServiceBusQueue; \ No newline at end of file diff --git a/lib/webhookDriver.js b/lib/webhookDriver.js new file mode 100644 index 0000000..3386560 --- /dev/null +++ b/lib/webhookDriver.js @@ -0,0 +1,39 @@ +const async = require('async'); + +const repoEvents = new Set(['issues', 'issue_comment', 'push', 'status']); +const orgEvents = new Set(['membership']); + +class WebhookDriver { + + static watch(queue, eventFinder, eventSink) { + async.whilst( + () => true, + (completion) => { + setTimeout(() => { WebhookDriver.handleNext(queue, eventFinder, eventSink, completion); }, 0); + }); + } + + static handleNext(queue, eventFinder, eventSink, completion) { + queue.pop(message => { + const source = WebhookDriver._chooseSource(message); + eventFinder.discoverAndQueue(source, eventSink).then( + () => { completion(); }, + (err) => { completion(err); }); + }); + } + + static _chooseSource(message) { + // TODO this top bit relies on service bus message structure + const type = message.customProperties.event; + const event = JSON.parse(message.body); + if (repoEvents.has(type)) { + const name = event.repository.full_name; + return `https://api.github.com/repos/${name}/events`; + } else if (orgEvents.has(type)) { + const name = event.organization.login.toLowercase(); + return `https://api.github.com/orgs/${name}/events`; + } + return null; + } +} +module.exports = WebhookDriver; \ No newline at end of file diff --git a/package.json b/package.json index 5735be3..a380549 100644 --- a/package.json +++ b/package.json @@ -2,10 +2,7 @@ "name": "ghcrawler", "version": "0.1.3", "description": "A robust GitHub API crawler that walks a queue of GitHub entities retrieving and storing their contents.", - "main": "./lib/crawler.js", - "bin": { - "ghcrawler": "./lib/crawler.js" - }, + "main": "./index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, @@ -26,10 +23,17 @@ "url": "https://github.com/microsoft/ghcrawler" }, "dependencies": { + "async": "^2.1.2", + "azure": "^1.2.0-preview", "moment": "2.15.2", "q": "1.4.1" }, - "devDependencies": {}, + "devDependencies": { + "chai": "^3.5.0", + "grunt": "^1.0.1", + "grunt-mocha-test": "^0.13.2", + "mocha": "^3.1.2" + }, "jshintConfig": { "esversion": "6" } diff --git a/test/eventFinderTests.js b/test/eventFinderTests.js new file mode 100644 index 0000000..9f5025a --- /dev/null +++ b/test/eventFinderTests.js @@ -0,0 +1,70 @@ +const assert = require('chai').assert; +const chai = require('chai'); +const expect = require('chai').expect; +const extend = require('extend'); +const finder = require('../lib/eventFinder.js'); +const Q = require('q'); + +describe('Event Finder', () => { + it('will find 1 document', () => { + const events = [ + [{ url: 'http://test1' }, { url: 'http://test2' }] + ]; + const eventDocuments = [{ 'http://test2': { etag: 34 } }]; + const instance = createFinder(events, eventDocuments); + + return instance.getNewEvents('http://test.com').then(found => { + expect(found.length).to.be.equal(1); + expect(found[0].url).to.be.equal('http://test1'); + }); + }); + it('will not find any documents', () => { + const events = [ + [{ url: 'http://test1' }, { url: 'http://test2' }] + ]; + const eventDocuments = [{ 'http://test1': { etag: 34 } }]; + const instance = createFinder(events, eventDocuments); + + return instance.getNewEvents('http://test.com').then(found => { + expect(found.length).to.be.equal(0); + }); + }); + it('will stop finding at first found document', () => { + const events = [ + [{ url: 'http://test1' }, { url: 'http://test2' }, { url: 'http://test3' }] + ]; + const eventDocuments = [{ 'http://test2': { etag: 34 } }]; + const instance = createFinder(events, eventDocuments); + + return instance.getNewEvents('http://test.com').then(found => { + expect(found.length).to.be.equal(1); + expect(found[0].url).to.be.equal('http://test1'); + }); + }); +}); + +function createFinder(events, documents) { + const eventStore = createStore(documents); + const requestor = createRequestor(events); + return new finder(requestor, eventStore); +} + +function createRequestor(pages) { + const result = {}; + result.getAll = () => { + return Q(pages.shift()); + }; + return result; +} + +function createStore(documents) { + const result = {}; + const hash = documents.reduce((collection, document) => { + extend(collection, document); + return collection; + }, {}); + result.etag = url => { + return hash[url]; + }; + return result; +} \ No newline at end of file diff --git a/test/eventReceiverTests.js b/test/eventReceiverTests.js new file mode 100644 index 0000000..5d0f73e --- /dev/null +++ b/test/eventReceiverTests.js @@ -0,0 +1,71 @@ +const assert = require('chai').assert; +const chai = require('chai'); +const expect = require('chai').expect; +const extend = require('extend'); +const finder = require('../lib/eventFinder.js'); +const receiver = require('../lib/WebhookDriver.js'); +const Q = require('q'); + +describe('Event Receiver', () => { + it('will find 1 document', () => { + const events = [ + [{ url: 'http://test1' }, { url: 'http://test2' }] + ]; + const eventDocuments = [{ 'http://test2': { etag: 34 } }]; + const instance = createFinder(events, eventDocuments); + + return instance.getNewEvents('http://test.com').then(found => { + expect(found.length).to.be.equal(1); + expect(found[0].url).to.be.equal('http://test1'); + }); + }); + it('will not find any documents', () => { + const events = [ + [{ url: 'http://test1' }, { url: 'http://test2' }] + ]; + const eventDocuments = [{ 'http://test1': { etag: 34 } }]; + const instance = createFinder(events, eventDocuments); + + return instance.getNewEvents('http://test.com').then(found => { + expect(found.length).to.be.equal(0); + }); + }); + it('will stop finding at first found document', () => { + const events = [ + [{ url: 'http://test1' }, { url: 'http://test2' }, { url: 'http://test3' }] + ]; + const eventDocuments = [{ 'http://test2': { etag: 34 } }]; + const instance = createFinder(events, eventDocuments); + + return instance.getNewEvents('http://test.com').then(found => { + expect(found.length).to.be.equal(1); + expect(found[0].url).to.be.equal('http://test1'); + }); + }); +}); + +function createFinder(events, documents) { + const eventStore = createStore(documents); + const requestor = createRequestor(events); + return new finder(requestor, eventStore); +} + +function createRequestor(pages) { + const result = {}; + result.getAll = () => { + return Q(pages.shift()); + }; + return result; +} + +function createStore(documents) { + const result = {}; + const hash = documents.reduce((collection, document) => { + extend(collection, document); + return collection; + }, {}); + result.etag = url => { + return hash[url]; + }; + return result; +} \ No newline at end of file diff --git a/test/memoryDocStore.js b/test/memoryDocStore.js new file mode 100644 index 0000000..61e7a73 --- /dev/null +++ b/test/memoryDocStore.js @@ -0,0 +1,23 @@ +class MemoryDocStore { + constructor() { + } + + connect() { + this.store = {}; + } + + upsert(document, callback) { + const selfHref = document._metadata.links.self.href; + this.store[selfHref] = { etag = Date.now, document: document }; + callback(); + } + + etag(url) { + const result = this.store[url]; + return result ? result.etag : null; + } + + close() { + this.store = null; + } +} \ No newline at end of file