This commit is contained in:
Gene Hazan 2018-03-02 14:28:04 -08:00
Родитель a64f6a2524
Коммит 28a2be71fa
6 изменённых файлов: 313 добавлений и 8 удалений

Просмотреть файл

@ -3,11 +3,14 @@
"browser": false,
"commonjs": true,
"es6": true,
"node": true
"node": true,
"mocha": true
},
"extends": "eslint:recommended",
"parserOptions": {
"ecmaFeatures": {
"jsx": true
"ecmaVersion": 8,
"jsx": false
},
"sourceType": "module"
},
@ -18,6 +21,7 @@
"no-unreachable": "warn",
"no-unused-vars": "warn",
"constructor-super": "warn",
"valid-typeof": "warn"
"valid-typeof": "warn",
"quotes": ["warn", "single"]
}
}

Просмотреть файл

@ -5,6 +5,7 @@ module.exports = {
queue: {
amqp: require('./queuing/amqpFactory'),
amqp10: require('./queuing/amqp10Factory'),
serviceBus: require('./queuing/serviceBusFactory'),
memory: require('./queuing/memoryFactory'),
amqp10Subscription: require('./queuing/amqp10SubscriptionFactory'),
webhook: require('./queuing/webhookFactory')

Просмотреть файл

@ -0,0 +1,20 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
const ServiceBusQueueManager = require('./serviceBusQueueManager');
const CrawlerFactory = require('../../crawlerFactory');
// {
// connectionString: config.get('CRAWLER_SERVICEBUS_CONNECTION_STRING') || config.get('CRAWLER_SERVICEBUS_MANAGER_ENDPOINT')
// }
module.exports = options => {
const { connectionString } = options;
const manager = new ServiceBusQueueManager(null, connectionString);
const env = process.env.NODE_ENV;
let tracker;
if (options.tracker) {
tracker = CrawlerFactory.createRequestTracker(`${env}:ServiceBus:${options.queueName}`, options);
}
return CrawlerFactory.createQueueSet(manager, tracker, options);
}

Просмотреть файл

@ -0,0 +1,144 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// SPDX-License-Identifier: MIT
const Q = require('q');
const qlimit = require('qlimit');
class ServiceBusQueue {
constructor(client, name, queueName, formatter, manager, options) {
this.client = client;
this.name = name;
this.queueName = queueName;
this.messageFormatter = formatter;
this.manager = manager;
this.options = options;
this.logger = options.logger;
}
subscribe() {
return this.manager.createQueue(this.queueName, this.options);
}
unsubscribe() {
return Q();
}
push(requests) {
requests = Array.isArray(requests) ? requests : [requests];
return Q.all(requests.map(qlimit(this.options.parallelPush || 1)(request => {
const body = JSON.stringify(request);
const deferred = Q.defer();
this.client.sendQueueMessage(this.queueName, body, (error) => {
if (error) {
return deferred.reject(error);
}
this._incrementMetric('push');
this._log('Queued', request);
deferred.resolve();
});
return deferred.promise;
})));
}
pop() {
const deferred = Q.defer();
this.client.receiveQueueMessage(this.queueName, { isPeekLock: true }, (error, message) => {
if (error === 'No messages to receive') {
this.logger.verbose(error);
return Q();
}
if (error) {
return deferred.reject(new Error(error));
}
this._incrementMetric('pop');
const request = this.messageFormatter(JSON.parse(message.body));
request._message = message;
this._log('Popped', request);
this._setLockRenewalTimer(request, 0, this.options.lockRenewal || 4.75 * 60 * 1000);
deferred.resolve(request);
});
return deferred.promise;
}
done(request) {
if (!request || !request._message) {
return Q();
}
const deferred = Q.defer();
this.client.deleteMessage(request._message, (error) => {
if (error) {
return deferred.reject(error);
}
this._incrementMetric('done');
this._log('ACKed', request);
clearTimeout(request._timeoutId);
deferred.resolve();
});
return deferred.promise;
}
defer(request) {
this._incrementMetric('defer');
return this.abandon(request);
}
abandon(request) {
const deferred = Q.defer();
this.client.unlockMessage(request._message, (error) => {
if (error) {
return deferred.reject(error);
}
this._incrementMetric('abandon');
this._log('NAKed', request);
clearTimeout(request._timeoutId);
deferred.resolve();
});
}
flush() {
return this.manager.flushQueue(this.queueName).then(() => this);
}
getInfo() {
return this.manager.getInfo(this.queueName).then(info => {
if (!info) {
return null;
}
info.metricsName = `${this.options.queueName}:${this.name}`;
return info;
});
}
getName() {
return this.name;
}
_incrementMetric(operation) {
const metrics = this.logger.metrics;
if (metrics && metrics[this.name] && metrics[this.name][operation]) {
metrics[this.name][operation].incr();
}
}
_setLockRenewalTimer(request, attempts = 0, delay = 4.5 * 60 * 1000) {
attempts++;
const timeoutId = setTimeout(() => {
this.client.renewLockForMessage(request._message, (renewLockError) => {
if (renewLockError) {
this.logger.error(renewLockError);
}
this.logger.verbose(`Renewed lock on ${request.type} ${request.url}, attempt ${attempts}`);
request._renewLockAttemptCount = attempts;
this._setLockRenewalTimer(request, attempts, delay);
});
}, delay);
request._timeoutId = timeoutId;
}
_log(actionMessage, request) {
const attemptString = request._renewLockAttemptCount ? ` (attempt ${request._renewLockAttemptCount})` : '';
this.logger.verbose(`${actionMessage} ${request.type} ${request.url}${attemptString}`);
}
}
module.exports = ServiceBusQueue;

Просмотреть файл

@ -9,6 +9,7 @@ const InMemoryRateLimiter = require('../limiting/inmemoryRateLimiter');
const RateLimitedPushQueue = require('./ratelimitedPushQueue');
const Request = require('../../lib/request');
const serviceBus = require('azure-sb');
const ServiceBusQueue = require('./serviceBusQueue');
const TrackedQueue = require('./trackedQueue');
const Q = require('q');
@ -34,6 +35,9 @@ class ServiceBusQueueManager {
}
_createClient(name, queueName, formatter, options) {
if (!this.amqpUrl) {
return new ServiceBusQueue(this.serviceBusService, name, queueName, formatter, this, options);
}
return new Amqp10Queue(this._getClient(), name, queueName, formatter, this, options);
}
@ -85,15 +89,15 @@ class ServiceBusQueueManager {
return deferred.promise;
}
createQueue(name) {
const options = {
createQueue(name, options = {}) {
const queueOptions = {
EnablePartitioning: true,
LockDuration: 'PT5M',
LockDuration: options.lockDuration || 'PT5M',
DefaultMessageTimeToLive: 'P10675199D',
MaxDeliveryCount: '10000000'
MaxDeliveryCount: options.maxDeliveryCount ? options.maxDeliveryCount.toString() : '10000000'
};
const deferred = Q.defer();
this.serviceBusService.createQueueIfNotExists(name, options, (error, created, response) => {
this.serviceBusService.createQueueIfNotExists(name, queueOptions, (error, created, response) => {
if (error) {
return deferred.reject(error);
}

Просмотреть файл

@ -0,0 +1,132 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// Run ./node_modules/mocha/bin/mocha test/integration/serviceBusQueueTests.js --timeout 60000
const config = require('painless-config');
const { expect } = require('chai');
const { after, before, describe, it } = require('mocha');
const { promisify } = require('util');
const Request = require('../../lib/request');
const ServiceBusQueue = require('../../providers/queuing/serviceBusQueue');
const ServiceBusQueueManager = require('../../providers/queuing/serviceBusQueueManager');
const connectionString = config.get('CRAWLER_SERVICEBUS_CONNECTION_STRING') || config.get('CRAWLER_SERVICEBUS_MANAGER_ENDPOINT');
const name = config.get('CRAWLER_NAME');
const queueName = 'sb-test';
const formatter = message => {
Request.adopt(message);
return message;
};
const options = {
logger: {
verbose: console.log,
error: console.error
},
queueName,
lockDuration: 'PT4S', // 4 sec
lockRenewal: 3000, // 3 sec
maxDeliveryCount: 100,
_config: { on: () => { } }
};
let serviceBusQueue = null;
describe('AMQP 1.0 Integration', () => {
before(async () => {
if (!connectionString) {
throw new Error('ServiceBus connectionString not configured.');
}
const manager = new ServiceBusQueueManager(null, connectionString);
serviceBusQueue = new ServiceBusQueue(manager.serviceBusService, name, queueName, formatter, manager, options);
await serviceBusQueue.subscribe();
});
after(async () => {
await serviceBusQueue.flush();
});
it('Should push, pop and ack a message', async () => {
let info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(0);
const msg = new Request('test1', 'test://test/test1');
await serviceBusQueue.push(msg);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
await setTimeout[promisify.custom](1000);
const request = await serviceBusQueue.pop();
expect(request).to.exist;
expect(request instanceof Request).to.be.true;
expect(request._timeoutId).to.exist;
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
await serviceBusQueue.done(request);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(0);
});
it('Should push, pop, nack, pop and ack a message', async () => {
let info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(0);
const msg = new Request('test2', 'test://test/test2');
await serviceBusQueue.push(msg);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
await setTimeout[promisify.custom](4000);
let request = await serviceBusQueue.pop();
expect(request).to.exist;
expect(request instanceof Request).to.be.true;
expect(request._timeoutId).to.exist;
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
await serviceBusQueue.abandon(request);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
request = await serviceBusQueue.pop();
expect(request).to.exist;
expect(request instanceof Request).to.be.true;
expect(request._timeoutId).to.exist;
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
await serviceBusQueue.done(request);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(0);
});
it('Should push, pop, wait for lock to be renewed and ack a message', async () => {
let info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(0);
const msg = new Request('test3', 'test://test/test3');
await serviceBusQueue.push(msg);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
await setTimeout[promisify.custom](4000);
let request = await serviceBusQueue.pop();
expect(request).to.exist;
expect(request instanceof Request).to.be.true;
expect(request._timeoutId).to.exist;
expect(request._message.brokerProperties.LockedUntilUtc).to.exist;
expect(new Date(request._message.brokerProperties.LockedUntilUtc)).to.be.greaterThan(new Date());
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(1);
expect(request._renewLockAttemptCount).to.be.undefined;
const timerAsyncId = getTimerAsyncId(request._timeoutId);
await setTimeout[promisify.custom](4000);
expect(request._renewLockAttemptCount).to.equal(1);
expect(getTimerAsyncId(request._timeoutId)).not.to.be.equal(timerAsyncId);
await serviceBusQueue.done(request);
info = await serviceBusQueue.getInfo();
expect(Number(info.count)).to.equal(0);
});
});
function getTimerAsyncId(timeoutId) {
return timeoutId[Object.getOwnPropertySymbols(timeoutId)[0]];
}