зеркало из
1
0
Форкнуть 0

Merge pull request #62 from amarzavery/encode

add support for encode/decode a message as an amqp data section.
This commit is contained in:
Amar Zavery 2018-05-02 10:45:07 -07:00 коммит произвёл GitHub
Родитель 845210b791 2001d32d70
Коммит 6b4b66192a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 534 добавлений и 74 удалений

Просмотреть файл

@ -96,6 +96,7 @@ export class BatchingReceiver extends EventHubReceiver {
// Action to be performed on the "message" event.
onReceiveMessage = (context: rheaPromise.Context) => {
const data: EventData = EventData.fromAmqpMessage(context.message!);
data.body = this._context.dataTransformer.decode(context.message!.body);
if (eventDatas.length <= maxMessageCount) {
eventDatas.push(data);
}
@ -117,7 +118,8 @@ export class BatchingReceiver extends EventHubReceiver {
};
const addCreditAndSetTimer = (reuse?: boolean) => {
debug("[%s] Receiver '%s', adding credit for receiving %d messages.", this._context.connectionId, this.name, maxMessageCount);
debug("[%s] Receiver '%s', adding credit for receiving %d messages.",
this._context.connectionId, this.name, maxMessageCount);
this._receiver.add_credit(maxMessageCount);
let msg: string = "[%s] Setting the wait timer for %d seconds for receiver '%s'.";
if (reuse) msg += " Receiver link already present, hence reusing it.";

Просмотреть файл

@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
import * as debugModule from "debug";
import * as uuid from "uuid/v4";
import * as Constants from "./util/constants";
@ -12,6 +11,8 @@ import { TokenProvider } from "./auth/token";
import { ManagementClient } from "./managementClient";
import { CbsClient } from "./cbs";
import { SasTokenProvider } from "./auth/sas";
import { ClientOptions } from "./eventHubClient";
import { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
const debug = debugModule("azure:event-hubs:connectionContext");
@ -33,6 +34,12 @@ export interface ConnectionContext {
* @property {string} [connectionId] The amqp connection id that uniquely identifies the connection within a process.
*/
connectionId?: string;
/**
* @property {DataTransformer} dataTransformer A DataTransformer object that has methods named
* - encode Responsible for encoding the AMQP message before sending it on the wire.
* - decode Responsible for decoding the received AMQP message before passing it to the customer.
*/
dataTransformer: DataTransformer;
/**
* @property {TokenProvider} tokenProvider The TokenProvider to be used for getting tokens for authentication for the EventHub client.
*/
@ -75,17 +82,20 @@ export namespace ConnectionContext {
*/
export const userAgent: string = "/js-event-hubs";
export function create(config: ConnectionConfig, tokenProvider?: TokenProvider): ConnectionContext {
export function create(config: ConnectionConfig, options?: ClientOptions): ConnectionContext {
ConnectionConfig.validate(config);
if (!options) options = {};
const context: ConnectionContext = {
connectionLock: `${Constants.establishConnection}-${uuid()}`,
negotiateClaimLock: `${Constants.negotiateClaim}-${uuid()}`,
config: config,
tokenProvider: tokenProvider || new SasTokenProvider(config.endpoint, config.sharedAccessKeyName, config.sharedAccessKey),
tokenProvider: options.tokenProvider ||
new SasTokenProvider(config.endpoint, config.sharedAccessKeyName, config.sharedAccessKey),
cbsSession: new CbsClient(),
managementSession: new ManagementClient(config.entityPath!),
senders: {},
receivers: {}
receivers: {},
dataTransformer: options.dataTransformer || new DefaultDataTransformer()
};
debug("Created connection context: %O", context);
return context;

95
lib/dataTransformer.ts Normal file
Просмотреть файл

@ -0,0 +1,95 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
import * as rhea from "rhea";
import * as debugModule from "debug";
const isBuffer = require("is-buffer");
const debug = debugModule("azure:event-hubs:datatransformer");
export interface DataTransformer {
/**
* @property {Function} encode A function that takes the body property from an EventData object
* and returns an encoded body (some form of AMQP type).
*/
encode: (body: any) => any;
/**
* @property {Function} decode A function that takes the body property from an AMQP message
* and returns the decoded message body. If it cannot decode the body then it returns the body
* as-is.
*/
decode: (body: any) => any;
}
/**
* The default data transformer that will be used by the Azure SDK.
*/
export class DefaultDataTransformer implements DataTransformer {
/**
* A function that takes the body property from an EventData object
* and returns an encoded body (some form of AMQP type).
* @method
* @param {*} body The AMQP message body
* @return {DataSection} encodedBody - The encoded AMQP message body as an AMQP Data type
* (data section in rhea terms). Section object with following properties:
* - typecode: 117 (0x75)
* - content: The given AMQP message body as a Buffer.
* - multiple: true | undefined.
*/
encode(body: any): any {
let result: any;
debug("[encode] The given message body that needs to be encoded is: ", body);
if (isBuffer(body)) {
result = rhea.message.data_section(body);
} else {
// string, undefined, null, boolean, array, object, number should end up here
// coercing undefined to null as that will ensure that null value will be given to the
// customer on receive.
if (body === undefined) body = null; // tslint:disable-line
try {
const bodyStr = JSON.stringify(body);
result = rhea.message.data_section(Buffer.from(bodyStr, "utf8"));
} catch (err) {
const msg = `An error occurred while executing JSON.stringify() on the given body ` + body
+ `${err ? err.stack : JSON.stringify(err)}`;
debug("[encode] " + msg);
throw new Error(msg);
}
}
debug("[encode] The encoded message body is: %O.", result);
return result;
}
/**
* @property {Function} [decode] A function that takes the body property from an AMQP message
* (an AMQP Data type (data section in rhea terms)) and returns the decoded message body.
* If it cannot decode the body then it returns the body
* as-is.
* @param {DataSection} body The AMQP message body
* @return {*} decoded body or the given body as-is.
*/
decode(body: any): any {
let processedBody: any = body;
try {
debug("[decode] Received message body for decoding is: %O", body);
if (body.content && isBuffer(body.content)) {
// This indicates that we are getting the AMQP described type. Let us try decoding it.
processedBody = body.content;
}
try {
// Trying to stringify and JSON.parse() anything else will fail flat and we shall return
// the original type back
const bodyStr: string = processedBody.toString("utf8");
processedBody = JSON.parse(bodyStr);
} catch (err) {
debug("[decode] An error occurred while trying JSON.parse() on the received body. " +
"The error is %O", err);
}
} catch (err) {
debug("[decode] An error occurred while decoding the received message body. The error is: %O", err);
}
debug("[decode] The decoded message body is: %O", processedBody);
return processedBody;
}
}

Просмотреть файл

@ -6,7 +6,7 @@ import * as debugModule from "debug";
import { BlobLeaseManager, LeaseManager } from "./blobLeaseManager";
import { BlobLease, Lease } from "./blobLease";
import { PartitionContext } from "./partitionContext";
import { EventHubClient } from "../eventHubClient";
import { EventHubClient, ClientOptions } from "../eventHubClient";
import { EventEmitter } from "events";
import {
TokenProvider, EventHubRuntimeInformation, EventHubPartitionRuntimeInformation,
@ -322,8 +322,9 @@ export class EventProcessorHost extends EventEmitter {
private async _attachReceiver(partitionId: string): Promise<ReceiveHandler> {
const context = this._contextByPartition![partitionId];
if (!context)
if (!context) {
throw new Error(`Invalid state - missing context for partition "${partitionId}".`);
}
const checkpoint = await context.updateCheckpointInfoFromLease();
let eventPosition: EventPosition | undefined = undefined;
if (checkpoint && checkpoint.offset) {
@ -413,9 +414,12 @@ export class EventProcessorHost extends EventEmitter {
eventHubConnectionString: string,
options?: ConnectionStringBasedOptions): EventProcessorHost {
if (!options) options = {};
const ehCOptions: ClientOptions = {
tokenProvider: options.tokenProvider
};
return new EventProcessorHost(hostName, storageConnectionString,
EventHubClient.createFromConnectionString(eventHubConnectionString, options.eventHubPath,
options.tokenProvider), options);
ehCOptions), options);
}
/**

Просмотреть файл

@ -132,12 +132,15 @@ export class PartitionContext {
updateCheckpointDataFromEventData(eventData: EventData): void {
if (eventData && eventData.annotations) {
const anno = eventData.annotations;
if (anno[Constants.enqueuedTime])
if (anno[Constants.enqueuedTime]) {
this._checkpointDetails.epoch = anno[Constants.enqueuedTime] as number;
if (anno[Constants.offset])
}
if (anno[Constants.offset]) {
this._checkpointDetails.offset = anno[Constants.offset] as string;
if (anno[Constants.sequenceNumber])
}
if (anno[Constants.sequenceNumber]) {
this._checkpointDetails.sequenceNumber = anno[Constants.sequenceNumber] as number;
}
debug("Updated checkpoint data from event data is: %O", this._checkpointDetails);
}
}

Просмотреть файл

@ -357,10 +357,11 @@ export function translate(err: AmqpError | Error): EventHubsError {
const error = new EventHubsError(description);
error.condition = condition;
if (condition) {
if (condition === "com.microsoft:precondition-failed")
if (condition === "com.microsoft:precondition-failed") {
error.name = "PreconditionFailedError";
else
} else {
error.name = ConditionErrorNameMapper[condition as any] || "EventHubsError";
}
}
if (description &&
(description.includes("status-code: 404") ||

Просмотреть файл

@ -4,7 +4,7 @@
import * as debugModule from "debug";
import { closeConnection, Delivery } from "./rhea-promise";
import { ApplicationTokenCredentials, DeviceTokenCredentials, UserTokenCredentials, MSITokenCredentials } from "ms-rest-azure";
import { ConnectionConfig, OnMessage, OnError, EventData, EventHubsError } from ".";
import { ConnectionConfig, OnMessage, OnError, EventData, EventHubsError, DataTransformer } from ".";
import * as rpc from "./rpc";
import { ConnectionContext } from "./connectionContext";
import { TokenProvider } from "./auth/token";
@ -50,6 +50,32 @@ export interface ReceiveOptions {
enableReceiverRuntimeMetric?: boolean;
}
/**
* Describes the base client options.
* @interface ClientOptionsBase
*/
export interface ClientOptionsBase {
/**
* @property {DataTransformer} [dataTransformer] The data transformer that will be used to encode
* and decode the sent and received messages respectively. If not provided then we will use the
* DefaultDataTransformer. The default transformer should handle majority of the cases. This
* option needs to be used only for specialized scenarios.
*/
dataTransformer?: DataTransformer;
}
/**
* Describes the options that can be provided while creating the EventHub Client.
* @interface ClientOptions
*/
export interface ClientOptions extends ClientOptionsBase {
/**
* @property {TokenProvider} [tokenProvider] - The token provider that provides the token for authentication.
* Default value: SasTokenProvider.
*/
tokenProvider?: TokenProvider;
}
/**
* @class EventHubClient
* Describes the EventHub client.
@ -67,15 +93,16 @@ export class EventHubClient {
private _context: ConnectionContext;
/**
* Instantiate a client pointing to the Event Hub given by this configuration.
* Instantiates a client pointing to the Event Hub given by this configuration.
*
* @constructor
* @param {ConnectionConfig} config - The connection configuration to create the EventHub Client.
* @param {TokenProvider} [tokenProvider] - The token provider that provides the token for authentication.
* Default value: SasTokenProvider.
*/
constructor(config: ConnectionConfig, tokenProvider?: TokenProvider) {
this._context = ConnectionContext.create(config, tokenProvider);
constructor(config: ConnectionConfig, options?: ClientOptions) {
if (!options) options = {};
this._context = ConnectionContext.create(config, options);
}
/**
@ -281,10 +308,11 @@ export class EventHubClient {
* @method createFromConnectionString
* @param {string} connectionString - Connection string of the form 'Endpoint=sb://my-servicebus-namespace.servicebus.windows.net/;SharedAccessKeyName=my-SA-name;SharedAccessKey=my-SA-key'
* @param {string} [path] - EventHub path of the form 'my-event-hub-name'
* @param {TokenProvider} [tokenProvider] - An instance of the token provider that provides the token for authentication. Default value: SasTokenProvider.
* @param {ClientOptions} [options] Options that can be provided during client creation.
* @param {TokenProvider} [options.tokenProvider] - An instance of the token provider that provides the token for authentication. Default value: SasTokenProvider.
* @returns {EventHubClient} - An instance of the eventhub client.
*/
static createFromConnectionString(connectionString: string, path?: string, tokenProvider?: TokenProvider): EventHubClient {
static createFromConnectionString(connectionString: string, path?: string, options?: ClientOptions): EventHubClient {
if (!connectionString || (connectionString && typeof connectionString !== "string")) {
throw new Error("'connectionString' is a required parameter and must be of type: 'string'.");
}
@ -293,7 +321,7 @@ export class EventHubClient {
if (!config.entityPath) {
throw new Error(`Either the connectionString must have "EntityPath=<path-to-entity>" or you must provide "path", while creating the client`);
}
return new EventHubClient(config, tokenProvider);
return new EventHubClient(config, options);
}
/**
@ -302,8 +330,14 @@ export class EventHubClient {
* @param {string} host - Fully qualified domain name for Event Hubs. Most likely, {yournamespace}.servicebus.windows.net
* @param {string} entityPath - EventHub path of the form 'my-event-hub-name'
* @param {TokenCredentials} credentials - The AAD Token credentials. It can be one of the following: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials.
* @param {ClientOptionsBase} options - The options that can be provided during client creation.
* @returns {EventHubClient} An instance of the Eventhub client.
*/
static createFromAadTokenCredentials(host: string, entityPath: string, credentials: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials): EventHubClient {
static createFromAadTokenCredentials(
host: string,
entityPath: string,
credentials: ApplicationTokenCredentials | UserTokenCredentials | DeviceTokenCredentials | MSITokenCredentials,
options?: ClientOptionsBase): EventHubClient {
if (!host || (host && typeof host !== "string")) {
throw new Error("'host' is a required parameter and must be of type: 'string'.");
}
@ -322,7 +356,9 @@ export class EventHubClient {
if (!host.endsWith("/")) host += "/";
const connectionString = `Endpoint=sb://${host};SharedAccessKeyName=defaultKeyName;SharedAccessKey=defaultKeyValue`;
const tokenProvider = new AadTokenProvider(credentials);
return EventHubClient.createFromConnectionString(connectionString, entityPath, tokenProvider);
if (!options) options = {};
const clientOptions: ClientOptions = options;
clientOptions.tokenProvider = new AadTokenProvider(credentials);
return EventHubClient.createFromConnectionString(connectionString, entityPath, clientOptions);
}
}

Просмотреть файл

@ -187,6 +187,8 @@ export class EventHubReceiver {
};
this._onAmqpMessage = (context: rheaPromise.Context) => {
const evData = EventData.fromAmqpMessage(context.message!);
evData.body = this._context.dataTransformer.decode(context.message!.body);
if (this.receiverRuntimeMetricEnabled && evData) {
this.runtimeInfo.lastSequenceNumber = evData.lastSequenceNumber;
this.runtimeInfo.lastEnqueuedTimeUtc = evData.lastEnqueuedTime;

Просмотреть файл

@ -102,8 +102,8 @@ export class EventHubSender {
"possibly the connection.", this.senderLock);
await defaultLock.acquire(this.senderLock, () => { return this._init(); });
}
const message = EventData.toAmqpMessage(data);
message.body = this._context.dataTransformer.encode(data.body);
return await this._trySend(message);
} catch (err) {
debug("An error occurred while sending the message %O", err);
@ -135,6 +135,7 @@ export class EventHubSender {
// Convert EventData to AmqpMessage.
for (let i = 0; i < datas.length; i++) {
const message = EventData.toAmqpMessage(datas[i]);
message.body = this._context.dataTransformer.encode(datas[i].body);
messages[i] = message;
}
// Encode every amqp message and then convert every encoded message to amqp data section

Просмотреть файл

@ -16,6 +16,7 @@ export {
ConnectionStringBasedOptions, EventProcessorOptions
} from "./eph";
export { EventPosition } from "./eventPosition";
export { DataTransformer, DefaultDataTransformer } from "./dataTransformer";
export { EventHubPartitionRuntimeInformation, EventHubRuntimeInformation } from "./managementClient";
export { TokenType, TokenProvider, TokenInfo } from "./auth/token";
export { aadEventHubsAudience } from "./util/constants";

Просмотреть файл

@ -9,7 +9,6 @@ import { RequestResponseLink, createRequestResponseLink, sendRequest } from "./r
import { defaultLock } from "./util/utils";
import { AmqpMessage } from ".";
const Buffer = require("buffer/").Buffer;
const debug = debugModule("azure:event-hubs:management");
export interface EventHubRuntimeInformation {

32
package-lock.json сгенерированный
Просмотреть файл

@ -243,11 +243,6 @@
"integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=",
"dev": true
},
"base64-js": {
"version": "1.2.3",
"resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.2.3.tgz",
"integrity": "sha512-MsAhsUW1GxCdgYSO6tAfZrNapmUKk7mWx/k5mFY/A1gBtkaCaNapTg+FExCw1r9yeaZhqx/xPg43xgTFH6KL5w=="
},
"base64url": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/base64url/-/base64url-2.0.0.tgz",
@ -291,15 +286,6 @@
"resolved": "https://registry.npmjs.org/browserify-mime/-/browserify-mime-1.2.9.tgz",
"integrity": "sha1-rrGvKN5sDXpqLOQK22j/GEIq8x8="
},
"buffer": {
"version": "5.1.0",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.1.0.tgz",
"integrity": "sha512-YkIRgwsZwJWTnyQrsBTWefizHh+8GYj3kbL1BTiAQ/9pwpino0G7B2gp5tx/FUBqUlvtxV85KNR3mwfAtv15Yw==",
"requires": {
"base64-js": "^1.0.2",
"ieee754": "^1.1.4"
}
},
"buffer-equal-constant-time": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/buffer-equal-constant-time/-/buffer-equal-constant-time-1.0.1.tgz",
@ -672,11 +658,6 @@
"sshpk": "^1.7.0"
}
},
"ieee754": {
"version": "1.1.8",
"resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.1.8.tgz",
"integrity": "sha1-vjPUCsEO8ZJnAfbwii2G+/0a0+Q="
},
"inflight": {
"version": "1.0.6",
"resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz",
@ -693,9 +674,9 @@
"integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4="
},
"is-buffer": {
"version": "1.1.6",
"resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-1.1.6.tgz",
"integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w=="
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-2.0.2.tgz",
"integrity": "sha512-imvkm8cOGKeZ/NwkAd+FAURi0hsL9gr3kvdi0r3MnqChcOdPaQRIOQiOU+sD40XzUIe6nFmSHYtQjbkDvaQbEg=="
},
"is-stream": {
"version": "1.1.0",
@ -929,6 +910,13 @@
"through": "^2.3.8",
"tunnel": "0.0.5",
"uuid": "^3.2.1"
},
"dependencies": {
"is-buffer": {
"version": "1.1.6",
"resolved": "https://registry.npmjs.org/is-buffer/-/is-buffer-1.1.6.tgz",
"integrity": "sha512-NcdALwpXkTm5Zvvbk7owOUSvVvBKDgKP5/ewfXEznmQFfs4ZRmanOeKBTjRVjka3QFoN6XJ+9F3USqfHqTaU5w=="
}
}
},
"ms-rest-azure": {

Просмотреть файл

@ -7,8 +7,8 @@
"main": "./dist/lib/index.js",
"types": "./typings/lib/index.d.ts",
"dependencies": {
"is-buffer": "2.0.2",
"async-lock": "^1.1.2",
"buffer": "^5.1.0",
"debug": "^3.1.0",
"ms-rest": "^2.3.3",
"ms-rest-azure": "^2.5.5",

Просмотреть файл

@ -0,0 +1,198 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import * as chai from "chai";
const should = chai.should();
import * as assert from "assert";
const isBuffer = require("is-buffer");
// import * as debugModule from "debug";
// const debug = debugModule("azure:event-hubs:datatransformer-spec");
import { DefaultDataTransformer } from "../lib";
describe("DataTransformer", function () {
const objectBody: any = {
id: '123-456-789',
weight: 10,
isBlue: true,
siblings: [
{
id: '098-789-564',
weight: 20,
isBlue: false,
}
]
};
const arrayBody = [
{
id: '098-789-564',
weight: 20,
isBlue: false,
},
10,
20,
"some string"
];
const stringBody: string = "some string";
const booleanBody: boolean = true;
const numberBody: number = 10.20;
const nullBody: null = null;
const undefinedBody: undefined = undefined;
const emptyStringBody: string = "";
const bufferbody: Buffer = Buffer.from("zzz", "utf8");
const hexBufferBody: Buffer = Buffer.from("7468697320697320612074c3a97374", "hex");
const transformer = new DefaultDataTransformer();
it("should correctly encode/decode a string message body", function (done) {
const encoded: any = transformer.encode(stringBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
decoded.should.equal(stringBody);
done();
});
it("should correctly encode/decode a number message body", function (done) {
const encoded: any = transformer.encode(numberBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
decoded.should.equal(numberBody);
done();
});
it("should correctly encode/decode a boolean message body", function (done) {
const encoded: any = transformer.encode(booleanBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
decoded.should.equal(booleanBody);
done();
});
it("should correctly encode/decode a null message body", function (done) {
const encoded: any = transformer.encode(nullBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
should.equal(decoded, nullBody);
done();
});
it("should correctly encode/decode an undefined message body", function (done) {
const encoded: any = transformer.encode(undefinedBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
should.equal(decoded, nullBody);
done();
});
it("should correctly encode/decode an empty string message body", function (done) {
const encoded: any = transformer.encode(emptyStringBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
decoded.should.equal(emptyStringBody);
done();
});
it("should correctly encode/decode an array message body", function (done) {
const encoded: any = transformer.encode(arrayBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
assert.deepEqual(decoded, arrayBody);
done();
});
it("should correctly encode/decode an object message body", function (done) {
const encoded: any = transformer.encode(objectBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
assert.deepEqual(decoded, objectBody);
done();
});
it("should correctly encode/decode a buffer message body", function (done) {
const encoded: any = transformer.encode(bufferbody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
assert.deepEqual(decoded, bufferbody);
done();
});
it("should correctly encode/decode a hex buffer message body", function (done) {
const encoded: any = transformer.encode(hexBufferBody);
encoded.typecode.should.equal(117);
isBuffer(encoded.content).should.equal(true);
const decoded: any = transformer.decode(encoded);
assert.deepEqual(decoded, hexBufferBody);
done();
});
describe("decode", function () {
// It is possible that we receive an AMQP value type from the messages that were sent with
// previously shipped version of the sdk. If so then we should be able to handle those scenarios.
it("should correctly decode a string message body", function (done) {
const decoded: any = transformer.decode(stringBody);
decoded.should.equal(stringBody);
done();
});
it("should correctly decode a number message body", function (done) {
const decoded: any = transformer.decode(numberBody);
decoded.should.equal(numberBody);
done();
});
it("should correctly decode a boolean message body", function (done) {
const decoded: any = transformer.decode(booleanBody);
decoded.should.equal(booleanBody);
done();
});
it("should correctly decode a null message body", function (done) {
const decoded: any = transformer.decode(nullBody);
should.equal(decoded, nullBody);
done();
});
it("should correctly decode an undefined message body", function (done) {
const decoded: any = transformer.decode(undefinedBody);
should.equal(decoded, undefined);
done();
});
it("should correctly decode an empty string message body", function (done) {
const decoded: any = transformer.decode(emptyStringBody);
decoded.should.equal(emptyStringBody);
done();
});
it("should correctly decode an array message body", function (done) {
const decoded: any = transformer.decode(arrayBody);
assert.deepEqual(decoded, arrayBody);
done();
});
it("should correctly decode an object message body", function (done) {
const decoded: any = transformer.decode(objectBody);
assert.deepEqual(decoded, objectBody);
done();
});
it("should correctly decode a buffer message body", function (done) {
const decoded: any = transformer.decode(bufferbody);
assert.deepEqual(decoded, bufferbody);
done();
});
it("should correctly decode a hex buffer message body", function (done) {
const decoded: any = transformer.decode(hexBufferBody);
assert.deepEqual(decoded, hexBufferBody);
done();
});
});
});

Просмотреть файл

@ -9,7 +9,7 @@ import * as debugModule from "debug";
import * as uuid from "uuid/v4";
const debug = debugModule("azure:event-hubs:lease-spec");
import { BlobLease } from "../lib/eph/blobLease";
import { parseConnectionString, StorageConnectionString } from "../lib/util/utils";
import { parseConnectionString, StorageConnectionStringModel } from "../lib/util/utils";
import * as dotenv from "dotenv";
dotenv.config();
@ -19,7 +19,7 @@ describe("Blob Lease", function () {
"define STORAGE_CONNECTION_STRING in your environment before running integration tests.");
});
const storageConnString = process.env.STORAGE_CONNECTION_STRING;
const config = parseConnectionString<StorageConnectionString>(storageConnString!);
const config = parseConnectionString<StorageConnectionStringModel>(storageConnString!);
it("should acquire and release a lease", async function () {
const blobName = "testblob-" + uuid();

Просмотреть файл

@ -10,7 +10,7 @@ import * as uuid from "uuid/v4";
const debug = debugModule("azure:event-hubs:lease-spec");
import { BlobLease } from "../lib/eph/blobLease";
import { BlobLeaseManager } from "../lib/eph/blobLeaseManager";
import { parseConnectionString, StorageConnectionString } from "../lib/util/utils";
import { parseConnectionString, StorageConnectionStringModel } from "../lib/util/utils";
import * as dotenv from "dotenv";
dotenv.config();
@ -21,7 +21,7 @@ describe("Blob Lease Manager", function () {
"define STORAGE_CONNECTION_STRING in your environment before running integration tests.");
});
const storageConnString = process.env.STORAGE_CONNECTION_STRING;
const config = parseConnectionString<StorageConnectionString>(storageConnString!);
const config = parseConnectionString<StorageConnectionStringModel>(storageConnString!);
it("should allow lease takeover", function (done) {
const blobName = "testblob-" + uuid();

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
import * as chai from "chai";
import * as assert from "assert";
const should = chai.should();
import * as chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
@ -36,23 +37,95 @@ describe("Misc tests", function () {
const obj: EventData = { body: msgBody };
debug("Sending one message with %d bytes.", bodysize);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
let datas = await breceiver.receive(5, 5);
datas.length.should.equal(0);
let data = await breceiver.receive(5, 5);
data.length.should.equal(0);
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
datas = await breceiver.receive(5, 10);
debug("received message: ", datas);
should.exist(datas);
datas.length.should.equal(1);
datas[0].body.toString().should.equal(msgString);
data = await breceiver.receive(5, 10);
await breceiver.close();
debug("received message: ", data);
should.exist(data);
data.length.should.equal(1);
data[0].body.toString().should.equal(msgString);
});
it("should be able to send and receive a JSON object as a message correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const msgBody = {
id: '123-456-789',
weight: 10,
isBlue: true,
siblings: [
{
id: '098-789-564',
weight: 20,
isBlue: false,
}
]
};
const obj: EventData = { body: msgBody };
debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5);
await breceiver.close();
debug("received message: ", data);
should.exist(data);
data.length.should.equal(1);
debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody);
});
it("should be able to send and receive an array as a message correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const msgBody = [
{
id: '098-789-564',
weight: 20,
isBlue: false,
},
10,
20,
"some string"
];
const obj: EventData = { body: msgBody };
debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5);
await breceiver.close();
debug("received message: ", data);
should.exist(data);
data.length.should.equal(1);
debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody);
});
it("should be able to send a boolean as a message correctly", async function () {
const partitionId = hubInfo.partitionIds[0];
const msgBody = true;
const obj: EventData = { body: msgBody };
debug("Sending one message %O", obj);
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
await client.send(obj, partitionId);
debug("Successfully sent the large message.");
const data = await breceiver.receive(5, 5);
await breceiver.close();
debug("received message: ", data);
should.exist(data);
data.length.should.equal(1);
debug("Received message: %O", data);
assert.deepEqual(data[0].body, msgBody);
});
it("should be able to send and receive batched messages correctly", async function () {
try {
const partitionId = hubInfo.partitionIds[0];
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
let datas = await breceiver.receive(5, 10);
datas.length.should.equal(0);
let data = await breceiver.receive(5, 10);
data.length.should.equal(0);
const messageCount = 5;
let d: EventData[] = [];
for (let i = 0; i < messageCount; i++) {
@ -63,10 +136,53 @@ describe("Misc tests", function () {
await client.sendBatch(d, partitionId);
debug("Successfully sent 5 messages batched together.");
datas = await breceiver.receive(5, 10);
debug("received message: ", datas);
should.exist(datas);
datas.length.should.equal(5);
data = await breceiver.receive(5, 10);
await breceiver.close();
debug("received message: ", data);
should.exist(data);
data.length.should.equal(5);
} catch (err) {
debug("should not have happened, uber catch....", err);
throw err;
}
});
it("should be able to send and receive batched messages correctly", async function () {
try {
const partitionId = hubInfo.partitionIds[0];
breceiver = BatchingReceiver.create((client as any)._context, partitionId, { eventPosition: EventPosition.fromEnqueuedTime(Date.now()) });
let data = await breceiver.receive(5, 5);
data.length.should.equal(0);
const messageCount = 5;
let d: EventData[] = [];
for (let i = 0; i < messageCount; i++) {
let obj: EventData = {
body: {
id: '123-456-789',
count: i,
weight: 10,
isBlue: true,
siblings: [
{
id: '098-789-564',
weight: 20,
isBlue: false,
}
]
}
};
d.push(obj);
}
d[0].partitionKey = 'pk1234656';
await client.sendBatch(d, partitionId);
debug("Successfully sent 5 messages batched together.");
data = await breceiver.receive(5, 5);
await breceiver.close();
debug("received message: ", data);
should.exist(data);
data[0].body.count.should.equal(0);
data.length.should.equal(5);
} catch (err) {
debug("should not have happened, uber catch....", err);
throw err;
@ -75,7 +191,7 @@ describe("Misc tests", function () {
it("should consistently send messages with partitionkey to a partitionId", async function () {
const msgToSendCount = 50;
let partitionOffsets = {};
let partitionOffsets: any = {};
debug("Discovering end of stream on each partition.");
const partitionIds = hubInfo.partitionIds;
for (let id of partitionIds) {
@ -84,7 +200,7 @@ describe("Misc tests", function () {
debug(`Partition ${id} has last message with offset ${pInfo.lastEnqueuedOffset}.`);
}
debug("Sending %d messages.", msgToSendCount);
function getRandomInt(max) {
function getRandomInt(max: number) {
return Math.floor(Math.random() * Math.floor(max));
}
for (let i = 0; i < msgToSendCount; i++) {
@ -92,12 +208,12 @@ describe("Misc tests", function () {
await client.send({ body: "Hello EventHub " + i, partitionKey: partitionKey.toString() });
}
debug("Starting to receive all messages from each partition.");
let partitionMap = {};
let partitionMap: any = {};
let totalReceived = 0;
for (let id of partitionIds) {
let datas = await client.receiveBatch(id, 50, 10, { eventPosition: EventPosition.fromOffset(partitionOffsets[id]) });
debug(`Received ${datas.length} messages from partition ${id}.`);
for (let d of datas) {
let data = await client.receiveBatch(id, 50, 10, { eventPosition: EventPosition.fromOffset(partitionOffsets[id]) });
debug(`Received ${data.length} messages from partition ${id}.`);
for (let d of data) {
debug(">>>> _raw_amqp_mesage: ", d._raw_amqp_mesage)
const pk = d.partitionKey as string;
debug("pk: ", pk);
@ -107,7 +223,7 @@ describe("Misc tests", function () {
partitionMap[pk] = id;
debug("partitionMap ", partitionMap);
}
totalReceived += datas.length;
totalReceived += data.length;
}
totalReceived.should.equal(msgToSendCount);
});

Просмотреть файл

@ -12,7 +12,7 @@ const debug = debugModule("azure:event-hubs:lease-spec");
import { PartitionContext } from "../lib/eph/partitionContext"
import { BlobLease } from "../lib/eph/blobLease";
import { BlobLeaseManager } from "../lib/eph/blobLeaseManager";
import { parseConnectionString, StorageConnectionString } from "../lib/util/utils";
import { parseConnectionString, StorageConnectionStringModel } from "../lib/util/utils";
import * as dotenv from "dotenv";
dotenv.config();
@ -23,7 +23,7 @@ describe("Partition Context", function () {
"define STORAGE_CONNECTION_STRING in your environment before running integration tests.");
});
const storageConnString = process.env.STORAGE_CONNECTION_STRING;
const config = parseConnectionString<StorageConnectionString>(storageConnString!);
const config = parseConnectionString<StorageConnectionStringModel>(storageConnString!);
it("should allow lease takeover", function (done) {
const blobName = "testblob-" + uuid();

Просмотреть файл

@ -83,6 +83,10 @@
true,
"always"
],
"curly": [
true,
"ignore-same-line"
],
"variable-name": [
true,
"check-format",