add a ratelimiter to the fetcher

This commit is contained in:
Jeff McAffer 2017-01-16 15:21:27 -08:00
Родитель 2b2872d32a
Коммит 20638810f1
1 изменённых файлов: 27 добавлений и 38 удалений

Просмотреть файл

@ -7,22 +7,23 @@ const URL = require('url');
class GitHubFetcher {
constructor(requestor, store, tokenFactory, options) {
constructor(requestor, store, tokenFactory, limiter, options) {
this.requestor = requestor;
this.store = store;
this.tokenFactory = tokenFactory;
this.limiter = limiter;
this.options = options;
this.logger = options.logger;
this.options._config.on('changed', this._reconfigure.bind(this));
this.fetchQueue = async.queue(this._callGitHubTask.bind(this), options.concurrency || 5);
// this.options._config.on('changed', this._reconfigure.bind(this));
// this.fetchQueue = async.queue(this._callGitHubTask.bind(this), options.concurrency || 5);
}
_reconfigure(current, changes) {
if (changes.some(patch => patch.path === '/concurrency')) {
this.fetchQueue.concurrency = this.options.concurrency;
}
return Q();
}
// _reconfigure(current, changes) {
// if (changes.some(patch => patch.path === '/concurrency')) {
// this.fetchQueue.concurrency = this.options.concurrency;
// }
// return Q();
// }
fetch(request) {
const initial = request.policy.initialFetch(request);
@ -86,42 +87,30 @@ class GitHubFetcher {
}
_getFromGitHub(request, options) {
const start = Date.now();
const deferred = Q.defer();
const url = this._addTokenToUrl(request, options);
this.fetchQueue.push({ url: url, options: options }, (error, response) => {
if (error) {
return deferred.reject(error);
}
request.addMeta({ status: response.statusCode, fetch: Date.now() - start });
deferred.resolve(response);
});
return deferred.promise;
const [token, url] = this._addTokenToUrl(request, options);
const key = `${token.slice(0, 2)}${token.slice(-2)}`;
const operation = () => {
this._incrementMetric('fetch');
const start = Date.now();
return this.requestor.get(url, options).then(response => {
request.addMeta({ status: response.statusCode, fetch: Date.now() - start });
return response;
});
};
return this.limiter.run(key, operation);
}
_addTokenToUrl(request, options) {
let token = options.headers.authorization;
if (!token) {
return request.url;
let header = options.headers.authorization;
if (!header) {
return ['', request.url];
}
const urlSpec = URL.parse(request.url, true);
urlSpec.query.access_token = token.slice(6);
const token = header.slice(6);
urlSpec.query.access_token = token;
delete urlSpec.search;
delete options.headers.authorization;
return URL.format(urlSpec);
}
_callGitHubTask(spec, callback) {
try {
this._incrementMetric('fetch');
this.requestor.get(spec.url, spec.options).then(
response =>
callback(null, response),
error =>
callback(error));
} catch (e) {
callback(e);
}
return [token, URL.format(urlSpec)];
}
_requeueBenched(request, benchTime) {