зеркало из
1
0
Форкнуть 0

Initial draft version of EventHub client for node.js.

Only APIs are defined at this point, the code is pretty much dummy, implementation will follow.
This commit is contained in:
dcristoloveanu 2015-10-23 16:25:47 -07:00
Коммит c30c619bb8
9 изменённых файлов: 384 добавлений и 0 удалений

1
.jshintignore Normal file
Просмотреть файл

@ -0,0 +1 @@
**/node_modules

20
.jshintrc Normal file
Просмотреть файл

@ -0,0 +1,20 @@
{
"node": true,
"mocha": true,
"bitwise": true,
"curly": false,
"eqeqeq": true,
"forin": true,
"immed": true,
"latedef": false,
"newcap": true,
"noarg": true,
"noempty": true,
"nonew": true,
"regexp": true,
"undef": true,
"unused": true,
"strict": true,
"globalstrict": true,
"trailing": true
}

10
index.js Normal file
Просмотреть файл

@ -0,0 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
module.exports = {
EventData: require('./lib/eventdata'),
Receiver: require("./lib/eventhubreceiver"),
Client: require("./lib/eventhubclient")
};

44
lib/eventdata.js Normal file
Просмотреть файл

@ -0,0 +1,44 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
/**
* @class EventData
* @classdesc Constructs a {@linkcode EventData} object.
* @param {String} bytes The event payload as a byte array.
*/
function EventData(bytes, systemProperties) {
Object.defineProperties(this, {
'PartitionKey':{
value: "",
writable: false
},
'Bytes':{
value: bytes,
writable: false
},
'EnqueuedTimeUtc':{
value: null,
writable: false
},
'Offset':{
value: "",
writable: false
},
'Properties':{
value: null,
writable: true
},
'SequenceNumber':{
value: 0,
writable: false
},
'SystemProperties':{
value: systemProperties,
writable: false
}
});
}
module.exports = EventData;

138
lib/eventhubclient.js Normal file
Просмотреть файл

@ -0,0 +1,138 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var amqp10 = require('amqp10');
var Promise = require('bluebird');
var EventHubData = require('./eventdata');
var EventHubReceiver = require("./eventhubreceiver");
var managementEndpoint = '$management';
function parseEventHubConnString(connString)
{
var config = {
host: '',
eventHubName: '',
keyName: '',
key: ''
};
var configArray = connString.split(';');
configArray.forEach(function(element) {
var res = element.match("Endpoint=sb://([^/]*)");
if (res != null) {
config.host = res[1];
}
res = element.match("SharedAccessKeyName=(.*)");
if (res != null) {
config.keyName = res[1];
}
res = element.match("SharedAccessKey=(.*)");
if (res != null) {
config.key = res[1];
}
res = element.match("EntityPath=(.*)");
if (res != null) {
config.eventHubName = res[1];
}
});
return config;
}
/**
* @class EventHubClient
* @classdesc Constructs a {@linkcode EventHubClient} object with the given connection string
* @param {String} connString The EventHub connection string
* @param {String} path The EventHub path. Optional.
*/
function EventHubClient(connString, path) {
this.config = parseEventHubConnString(connString);
if (!this.config.eventHubName || this.config.eventHubName.length == 0)
{
throw new Error('No event hub name specified');
}
this.uri = 'amqps://' +
encodeURIComponent(this.config.keyName) + ':' +
encodeURIComponent(this.config.key) + '@' +
this.config.host;
this.amqpClient = new amqp10.Client(amqp10.Policy.EventHub);
};
/**
* The [Send]{@linkcode EventHubClient#Send} method sends one Event to the Event Hub.
*/
EventHubClient.prototype.Send = function(event) {
/* This code has to be reworked so that it uses the same amqpClient, there is no need to have 2 clients and thus 2 connecitons open */
var self = this;
var endpoint = '/' + this.config.eventHubName;
return new Promise(function(resolve, reject) {
self.amqpClient.connect(self.uri).then(function() {
self.amqpClient.createSender(endpoint).then(function(sender) {
sender.on('errorReceived', function (tx_err) {
throw new Error('error sending request to Event Hub management endpoint.');
});
var request = { body: event.Bytes, properties: { messageId: '424242' } };
sender.send(request);
resolve();
});
});
});
}
/**
* The [SendBatch]{@linkcode EventHubClient#SendBatch} method sends a batch of Events to the Event Hub.
*/
EventHubClient.prototype.SendBatch = function(events) {
/* Not implemented yet */
}
/**
* The [GetPartitionIds]{@linkcode EventHubClient#GetPartitionIds} method gets the partition Ids for an EventHub.
*/
EventHubClient.prototype.GetPartitionIds = function() {
var self = this;
return new Promise(function(resolve, reject) {
var ehName = self.config.eventHubName;
var rxName = 'eventhubclient-rx';
var rxOptions = { attach: { target: { address: rxName } } };
self.amqpClient.connect(self.uri).then(function() {
/* create a sender to send the request to the $management endpoint */
self.amqpClient.createSender(managementEndpoint).then(function(sender) {
sender.on('errorReceived', function (tx_err) {
throw new Error('error sending request to Event Hub management endpoint.');
});
var request = { body: "stub", properties: { messageId: '424242', replyTo: rxName }, applicationProperties: { operation: "READ", name: ehName, type: "com.microsoft:eventhub" } };
return sender.send(request);
});
/* create a receiver for the management endpoint to receive the partition count */
self.amqpClient.createReceiver(managementEndpoint, rxOptions).then(function(receiver) {
receiver.on('errorReceived', function (rx_err) {
throw new Error('error receiving reply from Event Hub management endpoint.');
});
receiver.on('message', function (msg) {
return resolve(msg.body.partition_ids);
});
});
});
});
}
/**
* The [CreateReceiver]{@linkcode EventHubClient#CreateReceiver} method creates a new
* {@linkcode EventHubReceiver} instance.
* @param {String} consumerGroup The consumer group to use for the new receiver.
* @param {String} partitionId The partition Id to use for the new receiver.
*/
EventHubClient.prototype.CreateReceiver = function(consumerGroup, partitionId) {
return new EventHubReceiver(this.amqpClient, '/' + this.config.eventHubName + '/ConsumerGroups/' + consumerGroup + '/Partitions/' + partitionId);
};
module.exports = EventHubClient;

75
lib/eventhubreceiver.js Normal file
Просмотреть файл

@ -0,0 +1,75 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var EventData = require("./eventdata");
var Promise = require('bluebird');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
/**
* @class EventHubReceiver
* @classdesc Constructs an {@linkcode EventHubReceiver} object
*/
var EventHubReceiver = function (amqpClient, endpoint) {
this.amqpClient = amqpClient;
this.endpoint = endpoint;
};
util.inherits(EventHubReceiver, EventEmitter);
// On receiver message received
EventHubReceiver.MessageReceived = 'messageReceived';
// On receive error
EventHubReceiver.Error = 'error';
/* Notes: StartReceive shall handle retries
* onError shall be emitted after the retries have been exhausted
* EventHubReceiver shall support redirect
*/
/**
* The [StartReceive]{@linkcode EventHubReceiver#StartReceive} method starts
* receiving events from the event hub for the specified partition.
* @param startTime The startTime to use as filter for the events being received.
*/
EventHubReceiver.prototype.StartReceive = function(startTime) {
var self = this;
return new Promise(function(resolve, reject) {
self.amqpClient.createReceiver(self.endpoint).then(function (amqpReceiver) {
amqpReceiver.on('message', function (message) {
var eventData = new EventData(message.body, message.annotations.value);
self.emit(EventHubReceiver.MessageReceived, eventData);
});
amqpReceiver.on('errorReceived', function(rx_err) {
self.emit(EventHubReceiver.Error, rx_err);
});
});
resolve();
});
};
/**
* The [StartReceiveFromOffset]{@linkcode EventHubReceiver#StartReceiveFromOffset} method starts
* receiving events from the event hub, while filtering events starting at a certian offset.
* @param startOffset The start offset to use as filter for the events being received.
*/
EventHubReceiver.prototype.StartReceiveFromOffset = function(startOffset) {
var self = this;
return new Promise(function(resolve, reject) {
self.amqpClient.createReceiver(self.endpoint).then(function (amqpReceiver) {
amqpReceiver.on('message', function (message) {
var eventData = new EventData(message.body, message.annotations.value);
self.emit(EventHubReceiver.MessageReceived, eventData);
});
amqpReceiver.on('errorReceived', function(rx_err) {
self.emit(EventHubReceiver.Error, rx_err);
});
});
resolve();
});
};
module.exports = EventHubReceiver;

35
package.json Normal file
Просмотреть файл

@ -0,0 +1,35 @@
{
"name": "event-hub-client",
"version": "1.0.0-preview.1",
"description": "Azure EventHub client",
"author": "Microsoft Corporation",
"license": "MIT",
"main": "index.js",
"dependencies": {
"amqp10": "^2.0.1",
"bluebird": "^2.10.2"
},
"devDependencies": {
"jshint": "^2.8.0"
},
"engines": {
"node": "^0.12 || ^4"
},
"repository": {
"type": "git",
"url": "git+https://github.com/Azure/azure-event-hubs.git"
},
"bugs": {
"url": "https://github.com/Azure/azure-event-hubs/issues"
},
"homepage": "https://github.com/Azure/azure-event-hubs#readme",
"scripts": {
"lint": "jshint --show-non-errors .",
"unittest-min": "mocha --reporter dot lib/_*_test.js",
"alltest-min": "mocha --reporter dot lib/_*_test*.js",
"unittest": "mocha --reporter spec lib/_*_test.js",
"alltest": "mocha --reporter spec lib/_*_test*.js",
"ci": "npm -s run lint && npm -s run alltest-min",
"test": "npm -s run lint && npm -s run unittest"
}
}

36
samples/receivesample.js Normal file
Просмотреть файл

@ -0,0 +1,36 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var EventHub = require('../index.js');
function onReceiveMessage(eventData) {
console.log('Message received: ');
console.log(eventData.Bytes);
console.log(eventData.SystemProperties)
console.log('');
}
function onError(error) {
console.log('Receive error:' + error);
}
var ehClient = new EventHub.Client("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz;EntityPath=uuu");
var partitionIds = ehClient.GetPartitionIds().then(function(partitionIds) {
console.log('PartCount=' + partitionIds.length);
var receiver = ehClient.CreateReceiver("$Default", "0");
/* start receiving */
receiver.StartReceive(Date.Now).then(function() {
receiver.on('error', function(error) {
console.log('Receive error:' + error);
});
receiver.on('messageReceived', function(eventData) {
console.log('Message received: ');
console.log(eventData.Bytes);
console.log(eventData.SystemProperties)
console.log('');
});
});
});

25
samples/sendsample.js Normal file
Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
'use strict';
var EventHub = require('../index.js');
function onReceiveMessage(message) {
console.log('Message received: ');
console.log(message.body);
if (message.annotations) {
console.log('Annotations:');
console.log(message.annotations);
}
console.log('');
}
function onError(error) {
console.log('Receive error:' + error);
}
var ehClient = new EventHub.Client("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz;EntityPath=uuu");
var eventData = new EventHub.EventData(new Buffer("test"));
ehClient.Send(eventData).then(function() {
});