diff --git a/.eslintrc.json b/.eslintrc.json index 32fb80a..99c4125 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -3,11 +3,14 @@ "browser": false, "commonjs": true, "es6": true, - "node": true + "node": true, + "mocha": true }, + "extends": "eslint:recommended", "parserOptions": { "ecmaFeatures": { - "jsx": true + "ecmaVersion": 8, + "jsx": false }, "sourceType": "module" }, @@ -18,6 +21,7 @@ "no-unreachable": "warn", "no-unused-vars": "warn", "constructor-super": "warn", - "valid-typeof": "warn" + "valid-typeof": "warn", + "quotes": ["warn", "single"] } } \ No newline at end of file diff --git a/providers/index.js b/providers/index.js index 52eb181..c11c963 100644 --- a/providers/index.js +++ b/providers/index.js @@ -5,6 +5,7 @@ module.exports = { queue: { amqp: require('./queuing/amqpFactory'), amqp10: require('./queuing/amqp10Factory'), + serviceBus: require('./queuing/serviceBusFactory'), memory: require('./queuing/memoryFactory'), amqp10Subscription: require('./queuing/amqp10SubscriptionFactory'), webhook: require('./queuing/webhookFactory') diff --git a/providers/queuing/serviceBusFactory.js b/providers/queuing/serviceBusFactory.js new file mode 100644 index 0000000..1160502 --- /dev/null +++ b/providers/queuing/serviceBusFactory.js @@ -0,0 +1,20 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +const ServiceBusQueueManager = require('./serviceBusQueueManager'); +const CrawlerFactory = require('../../crawlerFactory'); + +// { +// connectionString: config.get('CRAWLER_SERVICEBUS_CONNECTION_STRING') || config.get('CRAWLER_SERVICEBUS_MANAGER_ENDPOINT') +// } + +module.exports = options => { + const { connectionString } = options; + const manager = new ServiceBusQueueManager(null, connectionString); + const env = process.env.NODE_ENV; + let tracker; + if (options.tracker) { + tracker = CrawlerFactory.createRequestTracker(`${env}:ServiceBus:${options.queueName}`, options); + } + return CrawlerFactory.createQueueSet(manager, tracker, options); +} \ No newline at end of file diff --git a/providers/queuing/serviceBusQueue.js b/providers/queuing/serviceBusQueue.js new file mode 100644 index 0000000..1fa5dc5 --- /dev/null +++ b/providers/queuing/serviceBusQueue.js @@ -0,0 +1,144 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// SPDX-License-Identifier: MIT + +const Q = require('q'); +const qlimit = require('qlimit'); + +class ServiceBusQueue { + constructor(client, name, queueName, formatter, manager, options) { + this.client = client; + this.name = name; + this.queueName = queueName; + this.messageFormatter = formatter; + this.manager = manager; + this.options = options; + this.logger = options.logger; + } + + subscribe() { + return this.manager.createQueue(this.queueName, this.options); + } + + unsubscribe() { + return Q(); + } + + push(requests) { + requests = Array.isArray(requests) ? requests : [requests]; + return Q.all(requests.map(qlimit(this.options.parallelPush || 1)(request => { + const body = JSON.stringify(request); + const deferred = Q.defer(); + this.client.sendQueueMessage(this.queueName, body, (error) => { + if (error) { + return deferred.reject(error); + } + this._incrementMetric('push'); + this._log('Queued', request); + deferred.resolve(); + }); + return deferred.promise; + }))); + } + + pop() { + const deferred = Q.defer(); + this.client.receiveQueueMessage(this.queueName, { isPeekLock: true }, (error, message) => { + if (error === 'No messages to receive') { + this.logger.verbose(error); + return Q(); + } + if (error) { + return deferred.reject(new Error(error)); + } + this._incrementMetric('pop'); + const request = this.messageFormatter(JSON.parse(message.body)); + request._message = message; + this._log('Popped', request); + this._setLockRenewalTimer(request, 0, this.options.lockRenewal || 4.75 * 60 * 1000); + deferred.resolve(request); + }); + return deferred.promise; + } + + done(request) { + if (!request || !request._message) { + return Q(); + } + const deferred = Q.defer(); + this.client.deleteMessage(request._message, (error) => { + if (error) { + return deferred.reject(error); + } + this._incrementMetric('done'); + this._log('ACKed', request); + clearTimeout(request._timeoutId); + deferred.resolve(); + }); + return deferred.promise; + } + + defer(request) { + this._incrementMetric('defer'); + return this.abandon(request); + } + + abandon(request) { + const deferred = Q.defer(); + this.client.unlockMessage(request._message, (error) => { + if (error) { + return deferred.reject(error); + } + this._incrementMetric('abandon'); + this._log('NAKed', request); + clearTimeout(request._timeoutId); + deferred.resolve(); + }); + } + + flush() { + return this.manager.flushQueue(this.queueName).then(() => this); + } + + getInfo() { + return this.manager.getInfo(this.queueName).then(info => { + if (!info) { + return null; + } + info.metricsName = `${this.options.queueName}:${this.name}`; + return info; + }); + } + + getName() { + return this.name; + } + + _incrementMetric(operation) { + const metrics = this.logger.metrics; + if (metrics && metrics[this.name] && metrics[this.name][operation]) { + metrics[this.name][operation].incr(); + } + } + + _setLockRenewalTimer(request, attempts = 0, delay = 4.5 * 60 * 1000) { + attempts++; + const timeoutId = setTimeout(() => { + this.client.renewLockForMessage(request._message, (renewLockError) => { + if (renewLockError) { + this.logger.error(renewLockError); + } + this.logger.verbose(`Renewed lock on ${request.type} ${request.url}, attempt ${attempts}`); + request._renewLockAttemptCount = attempts; + this._setLockRenewalTimer(request, attempts, delay); + }); + }, delay); + request._timeoutId = timeoutId; + } + + _log(actionMessage, request) { + const attemptString = request._renewLockAttemptCount ? ` (attempt ${request._renewLockAttemptCount})` : ''; + this.logger.verbose(`${actionMessage} ${request.type} ${request.url}${attemptString}`); + } +} + +module.exports = ServiceBusQueue; diff --git a/providers/queuing/serviceBusQueueManager.js b/providers/queuing/serviceBusQueueManager.js index 58dcb68..1079dc8 100644 --- a/providers/queuing/serviceBusQueueManager.js +++ b/providers/queuing/serviceBusQueueManager.js @@ -9,6 +9,7 @@ const InMemoryRateLimiter = require('../limiting/inmemoryRateLimiter'); const RateLimitedPushQueue = require('./ratelimitedPushQueue'); const Request = require('../../lib/request'); const serviceBus = require('azure-sb'); +const ServiceBusQueue = require('./serviceBusQueue'); const TrackedQueue = require('./trackedQueue'); const Q = require('q'); @@ -34,6 +35,9 @@ class ServiceBusQueueManager { } _createClient(name, queueName, formatter, options) { + if (!this.amqpUrl) { + return new ServiceBusQueue(this.serviceBusService, name, queueName, formatter, this, options); + } return new Amqp10Queue(this._getClient(), name, queueName, formatter, this, options); } @@ -85,15 +89,15 @@ class ServiceBusQueueManager { return deferred.promise; } - createQueue(name) { - const options = { + createQueue(name, options = {}) { + const queueOptions = { EnablePartitioning: true, - LockDuration: 'PT5M', + LockDuration: options.lockDuration || 'PT5M', DefaultMessageTimeToLive: 'P10675199D', - MaxDeliveryCount: '10000000' + MaxDeliveryCount: options.maxDeliveryCount ? options.maxDeliveryCount.toString() : '10000000' }; const deferred = Q.defer(); - this.serviceBusService.createQueueIfNotExists(name, options, (error, created, response) => { + this.serviceBusService.createQueueIfNotExists(name, queueOptions, (error, created, response) => { if (error) { return deferred.reject(error); } diff --git a/test/integration/serviceBusQueueTests.js b/test/integration/serviceBusQueueTests.js new file mode 100644 index 0000000..09c26e2 --- /dev/null +++ b/test/integration/serviceBusQueueTests.js @@ -0,0 +1,132 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +// Run ./node_modules/mocha/bin/mocha test/integration/serviceBusQueueTests.js --timeout 60000 +const config = require('painless-config'); +const { expect } = require('chai'); +const { after, before, describe, it } = require('mocha'); +const { promisify } = require('util'); +const Request = require('../../lib/request'); +const ServiceBusQueue = require('../../providers/queuing/serviceBusQueue'); +const ServiceBusQueueManager = require('../../providers/queuing/serviceBusQueueManager'); + +const connectionString = config.get('CRAWLER_SERVICEBUS_CONNECTION_STRING') || config.get('CRAWLER_SERVICEBUS_MANAGER_ENDPOINT'); +const name = config.get('CRAWLER_NAME'); +const queueName = 'sb-test'; +const formatter = message => { + Request.adopt(message); + return message; +}; +const options = { + logger: { + verbose: console.log, + error: console.error + }, + queueName, + lockDuration: 'PT4S', // 4 sec + lockRenewal: 3000, // 3 sec + maxDeliveryCount: 100, + _config: { on: () => { } } +}; +let serviceBusQueue = null; + +describe('AMQP 1.0 Integration', () => { + before(async () => { + if (!connectionString) { + throw new Error('ServiceBus connectionString not configured.'); + } + const manager = new ServiceBusQueueManager(null, connectionString); + serviceBusQueue = new ServiceBusQueue(manager.serviceBusService, name, queueName, formatter, manager, options); + await serviceBusQueue.subscribe(); + }); + + after(async () => { + await serviceBusQueue.flush(); + }); + + it('Should push, pop and ack a message', async () => { + let info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(0); + const msg = new Request('test1', 'test://test/test1'); + await serviceBusQueue.push(msg); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + await setTimeout[promisify.custom](1000); + const request = await serviceBusQueue.pop(); + expect(request).to.exist; + expect(request instanceof Request).to.be.true; + expect(request._timeoutId).to.exist; + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + await serviceBusQueue.done(request); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(0); + }); + + it('Should push, pop, nack, pop and ack a message', async () => { + let info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(0); + const msg = new Request('test2', 'test://test/test2'); + await serviceBusQueue.push(msg); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + await setTimeout[promisify.custom](4000); + let request = await serviceBusQueue.pop(); + expect(request).to.exist; + expect(request instanceof Request).to.be.true; + expect(request._timeoutId).to.exist; + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + await serviceBusQueue.abandon(request); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + request = await serviceBusQueue.pop(); + expect(request).to.exist; + expect(request instanceof Request).to.be.true; + expect(request._timeoutId).to.exist; + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + await serviceBusQueue.done(request); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(0); + }); + + it('Should push, pop, wait for lock to be renewed and ack a message', async () => { + let info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(0); + const msg = new Request('test3', 'test://test/test3'); + await serviceBusQueue.push(msg); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + await setTimeout[promisify.custom](4000); + let request = await serviceBusQueue.pop(); + expect(request).to.exist; + expect(request instanceof Request).to.be.true; + expect(request._timeoutId).to.exist; + expect(request._message.brokerProperties.LockedUntilUtc).to.exist; + expect(new Date(request._message.brokerProperties.LockedUntilUtc)).to.be.greaterThan(new Date()); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(1); + + expect(request._renewLockAttemptCount).to.be.undefined; + const timerAsyncId = getTimerAsyncId(request._timeoutId); + await setTimeout[promisify.custom](4000); + expect(request._renewLockAttemptCount).to.equal(1); + expect(getTimerAsyncId(request._timeoutId)).not.to.be.equal(timerAsyncId); + + await serviceBusQueue.done(request); + info = await serviceBusQueue.getInfo(); + expect(Number(info.count)).to.equal(0); + }); +}); + +function getTimerAsyncId(timeoutId) { + return timeoutId[Object.getOwnPropertySymbols(timeoutId)[0]]; +} \ No newline at end of file