зеркало из https://github.com/microsoft/ghcrawler.git
Add first webhook event driving
This commit is contained in:
Родитель
c17395b1c1
Коммит
dc9b5091e9
|
@ -1,6 +1,25 @@
|
|||
{
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Run mocha",
|
||||
"type": "node",
|
||||
"request": "launch",
|
||||
"program": "${workspaceRoot}/node_modules/mocha/bin/_mocha",
|
||||
"stopOnEntry": false,
|
||||
"args": [
|
||||
"test/eventFinderTests.js"
|
||||
],
|
||||
"cwd": "${workspaceRoot}",
|
||||
"runtimeExecutable": null,
|
||||
"runtimeArgs": [
|
||||
"--nolazy"
|
||||
],
|
||||
"env": {
|
||||
"NODE_ENV": "development"
|
||||
},
|
||||
"console": "internalConsole"
|
||||
},
|
||||
{
|
||||
"name": "Attach",
|
||||
"type": "node",
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
// Place your settings in this file to overwrite default and user settings.
|
||||
{
|
||||
"jshint.options": {
|
||||
"esnext": true
|
||||
}
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
module.exports.serviceBusQueue = require('./lib/serviceBusQueue');
|
||||
module.exports.eventFinder = require('./lib/eventFinder');
|
||||
module.exports.webhookReceiver = require('./lib/WebhookDriver');
|
||||
module.exports.crawler = require('./lib/crawler');
|
|
@ -7,7 +7,6 @@ class Crawler {
|
|||
this.queue = queue;
|
||||
this.store = store;
|
||||
this.requestor = requestor;
|
||||
this.config = config;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
|
@ -43,7 +42,7 @@ class Crawler {
|
|||
return Q.resolve(request);
|
||||
}
|
||||
|
||||
return this._getRequestor().getAll(request.url)
|
||||
this.requestor.getAll(crawlRequest.url, this._defaultOptions()).then(
|
||||
.then(githubResponse => {
|
||||
request.response = githubResponse;
|
||||
return request;
|
||||
|
@ -77,6 +76,10 @@ class Crawler {
|
|||
}
|
||||
let document = null;
|
||||
switch (request.type) {
|
||||
case 'event': {
|
||||
document = self._processEvent();
|
||||
|
||||
}
|
||||
case 'orgs': {
|
||||
document = this._processCollection(request.response, 'login', request.context);
|
||||
break;
|
||||
|
@ -156,9 +159,7 @@ class Crawler {
|
|||
}
|
||||
|
||||
return this.queue.done(request).then(() => { return request; });
|
||||
}
|
||||
|
||||
_markSeen(request) {
|
||||
return {
|
||||
// TODO retryable vs non-retryable and re-queue
|
||||
this.seen[request.url] = true;
|
||||
return Q.resolve(request);
|
||||
|
@ -166,10 +167,7 @@ class Crawler {
|
|||
|
||||
_getRequestor() {
|
||||
return new this.requestor({
|
||||
headers: {
|
||||
authorization: this.config.githubToken
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
_processCollection(document, type, context) {
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
const Q = require('q');
|
||||
const async = require('async');
|
||||
|
||||
class EventFinder {
|
||||
constructor(requestor, eventStore) {
|
||||
this.requestor = requestor;
|
||||
this.eventStore = eventStore;
|
||||
}
|
||||
|
||||
discoverAndQueue(eventSource, eventSink) {
|
||||
if (!eventSource) {
|
||||
return Q(null);
|
||||
}
|
||||
return this.getNewEvents(eventSource).then(events => {
|
||||
self._queueEvents(events, eventSink);
|
||||
});
|
||||
}
|
||||
|
||||
_queueEvents(events, eventSink) {
|
||||
events.forEach(event => {
|
||||
eventSink.push({ type: 'event', url: event.url });
|
||||
});
|
||||
}
|
||||
|
||||
getNewEvents(eventSource) {
|
||||
const self = this;
|
||||
return this.requestor.getAll(eventSource).then(self._findNew.bind(self));
|
||||
}
|
||||
|
||||
_findNew(events, callback = null) {
|
||||
const deferred = Q.defer();
|
||||
const realCallback = callback || ((err, value) => {
|
||||
if (err)
|
||||
deferred.reject(err);
|
||||
else
|
||||
deferred.resolve(value);
|
||||
});
|
||||
async.filterLimit(events, 1, (event, cb) => {
|
||||
this.eventStore.etag('event', event.url, (err, tag) => { cb(err, !tag); });
|
||||
}, realCallback);
|
||||
return callback ? null : deferred.promise;
|
||||
}
|
||||
}
|
||||
module.exports = EventFinder;
|
|
@ -0,0 +1,27 @@
|
|||
const azure = require('azure');
|
||||
|
||||
const topicName = 'webhookevents';
|
||||
const subscriptionName = 'ghcrawlerdev';
|
||||
const notSoSmartTimeoutMilliseconds = 1000;
|
||||
|
||||
class ServiceBusQueue {
|
||||
|
||||
constructor(connectionSpec) {
|
||||
this.bus = azure.createServiceBusService(connectionSpec);
|
||||
}
|
||||
|
||||
pop(handler) {
|
||||
this.bus.receiveSubscriptionMessage(topicName, subscriptionName, { isPeekLock: true }, (peekError, message) => {
|
||||
if (!message) {
|
||||
// No messages found. Let's chill out for a little bit.
|
||||
// Could use a smart retry system here based on how many chillout moments we have had lately.
|
||||
return setTimeout(completion, notSoSmartTimeoutMilliseconds);
|
||||
}
|
||||
if (peekError) {
|
||||
return;
|
||||
}
|
||||
return handler(message);
|
||||
});
|
||||
}
|
||||
}
|
||||
module.exports = ServiceBusQueue;
|
|
@ -0,0 +1,39 @@
|
|||
const async = require('async');
|
||||
|
||||
const repoEvents = new Set(['issues', 'issue_comment', 'push', 'status']);
|
||||
const orgEvents = new Set(['membership']);
|
||||
|
||||
class WebhookDriver {
|
||||
|
||||
static watch(queue, eventFinder, eventSink) {
|
||||
async.whilst(
|
||||
() => true,
|
||||
(completion) => {
|
||||
setTimeout(() => { WebhookDriver.handleNext(queue, eventFinder, eventSink, completion); }, 0);
|
||||
});
|
||||
}
|
||||
|
||||
static handleNext(queue, eventFinder, eventSink, completion) {
|
||||
queue.pop(message => {
|
||||
const source = WebhookDriver._chooseSource(message);
|
||||
eventFinder.discoverAndQueue(source, eventSink).then(
|
||||
() => { completion(); },
|
||||
(err) => { completion(err); });
|
||||
});
|
||||
}
|
||||
|
||||
static _chooseSource(message) {
|
||||
// TODO this top bit relies on service bus message structure
|
||||
const type = message.customProperties.event;
|
||||
const event = JSON.parse(message.body);
|
||||
if (repoEvents.has(type)) {
|
||||
const name = event.repository.full_name;
|
||||
return `https://api.github.com/repos/${name}/events`;
|
||||
} else if (orgEvents.has(type)) {
|
||||
const name = event.organization.login.toLowercase();
|
||||
return `https://api.github.com/orgs/${name}/events`;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
module.exports = WebhookDriver;
|
14
package.json
14
package.json
|
@ -2,10 +2,7 @@
|
|||
"name": "ghcrawler",
|
||||
"version": "0.1.3",
|
||||
"description": "A robust GitHub API crawler that walks a queue of GitHub entities retrieving and storing their contents.",
|
||||
"main": "./lib/crawler.js",
|
||||
"bin": {
|
||||
"ghcrawler": "./lib/crawler.js"
|
||||
},
|
||||
"main": "./index.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
},
|
||||
|
@ -26,10 +23,17 @@
|
|||
"url": "https://github.com/microsoft/ghcrawler"
|
||||
},
|
||||
"dependencies": {
|
||||
"async": "^2.1.2",
|
||||
"azure": "^1.2.0-preview",
|
||||
"moment": "2.15.2",
|
||||
"q": "1.4.1"
|
||||
},
|
||||
"devDependencies": {},
|
||||
"devDependencies": {
|
||||
"chai": "^3.5.0",
|
||||
"grunt": "^1.0.1",
|
||||
"grunt-mocha-test": "^0.13.2",
|
||||
"mocha": "^3.1.2"
|
||||
},
|
||||
"jshintConfig": {
|
||||
"esversion": "6"
|
||||
}
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
const assert = require('chai').assert;
|
||||
const chai = require('chai');
|
||||
const expect = require('chai').expect;
|
||||
const extend = require('extend');
|
||||
const finder = require('../lib/eventFinder.js');
|
||||
const Q = require('q');
|
||||
|
||||
describe('Event Finder', () => {
|
||||
it('will find 1 document', () => {
|
||||
const events = [
|
||||
[{ url: 'http://test1' }, { url: 'http://test2' }]
|
||||
];
|
||||
const eventDocuments = [{ 'http://test2': { etag: 34 } }];
|
||||
const instance = createFinder(events, eventDocuments);
|
||||
|
||||
return instance.getNewEvents('http://test.com').then(found => {
|
||||
expect(found.length).to.be.equal(1);
|
||||
expect(found[0].url).to.be.equal('http://test1');
|
||||
});
|
||||
});
|
||||
it('will not find any documents', () => {
|
||||
const events = [
|
||||
[{ url: 'http://test1' }, { url: 'http://test2' }]
|
||||
];
|
||||
const eventDocuments = [{ 'http://test1': { etag: 34 } }];
|
||||
const instance = createFinder(events, eventDocuments);
|
||||
|
||||
return instance.getNewEvents('http://test.com').then(found => {
|
||||
expect(found.length).to.be.equal(0);
|
||||
});
|
||||
});
|
||||
it('will stop finding at first found document', () => {
|
||||
const events = [
|
||||
[{ url: 'http://test1' }, { url: 'http://test2' }, { url: 'http://test3' }]
|
||||
];
|
||||
const eventDocuments = [{ 'http://test2': { etag: 34 } }];
|
||||
const instance = createFinder(events, eventDocuments);
|
||||
|
||||
return instance.getNewEvents('http://test.com').then(found => {
|
||||
expect(found.length).to.be.equal(1);
|
||||
expect(found[0].url).to.be.equal('http://test1');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function createFinder(events, documents) {
|
||||
const eventStore = createStore(documents);
|
||||
const requestor = createRequestor(events);
|
||||
return new finder(requestor, eventStore);
|
||||
}
|
||||
|
||||
function createRequestor(pages) {
|
||||
const result = {};
|
||||
result.getAll = () => {
|
||||
return Q(pages.shift());
|
||||
};
|
||||
return result;
|
||||
}
|
||||
|
||||
function createStore(documents) {
|
||||
const result = {};
|
||||
const hash = documents.reduce((collection, document) => {
|
||||
extend(collection, document);
|
||||
return collection;
|
||||
}, {});
|
||||
result.etag = url => {
|
||||
return hash[url];
|
||||
};
|
||||
return result;
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
const assert = require('chai').assert;
|
||||
const chai = require('chai');
|
||||
const expect = require('chai').expect;
|
||||
const extend = require('extend');
|
||||
const finder = require('../lib/eventFinder.js');
|
||||
const receiver = require('../lib/WebhookDriver.js');
|
||||
const Q = require('q');
|
||||
|
||||
describe('Event Receiver', () => {
|
||||
it('will find 1 document', () => {
|
||||
const events = [
|
||||
[{ url: 'http://test1' }, { url: 'http://test2' }]
|
||||
];
|
||||
const eventDocuments = [{ 'http://test2': { etag: 34 } }];
|
||||
const instance = createFinder(events, eventDocuments);
|
||||
|
||||
return instance.getNewEvents('http://test.com').then(found => {
|
||||
expect(found.length).to.be.equal(1);
|
||||
expect(found[0].url).to.be.equal('http://test1');
|
||||
});
|
||||
});
|
||||
it('will not find any documents', () => {
|
||||
const events = [
|
||||
[{ url: 'http://test1' }, { url: 'http://test2' }]
|
||||
];
|
||||
const eventDocuments = [{ 'http://test1': { etag: 34 } }];
|
||||
const instance = createFinder(events, eventDocuments);
|
||||
|
||||
return instance.getNewEvents('http://test.com').then(found => {
|
||||
expect(found.length).to.be.equal(0);
|
||||
});
|
||||
});
|
||||
it('will stop finding at first found document', () => {
|
||||
const events = [
|
||||
[{ url: 'http://test1' }, { url: 'http://test2' }, { url: 'http://test3' }]
|
||||
];
|
||||
const eventDocuments = [{ 'http://test2': { etag: 34 } }];
|
||||
const instance = createFinder(events, eventDocuments);
|
||||
|
||||
return instance.getNewEvents('http://test.com').then(found => {
|
||||
expect(found.length).to.be.equal(1);
|
||||
expect(found[0].url).to.be.equal('http://test1');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function createFinder(events, documents) {
|
||||
const eventStore = createStore(documents);
|
||||
const requestor = createRequestor(events);
|
||||
return new finder(requestor, eventStore);
|
||||
}
|
||||
|
||||
function createRequestor(pages) {
|
||||
const result = {};
|
||||
result.getAll = () => {
|
||||
return Q(pages.shift());
|
||||
};
|
||||
return result;
|
||||
}
|
||||
|
||||
function createStore(documents) {
|
||||
const result = {};
|
||||
const hash = documents.reduce((collection, document) => {
|
||||
extend(collection, document);
|
||||
return collection;
|
||||
}, {});
|
||||
result.etag = url => {
|
||||
return hash[url];
|
||||
};
|
||||
return result;
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
class MemoryDocStore {
|
||||
constructor() {
|
||||
}
|
||||
|
||||
connect() {
|
||||
this.store = {};
|
||||
}
|
||||
|
||||
upsert(document, callback) {
|
||||
const selfHref = document._metadata.links.self.href;
|
||||
this.store[selfHref] = { etag = Date.now, document: document };
|
||||
callback();
|
||||
}
|
||||
|
||||
etag(url) {
|
||||
const result = this.store[url];
|
||||
return result ? result.etag : null;
|
||||
}
|
||||
|
||||
close() {
|
||||
this.store = null;
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче