[Service Bus] "message" word added to "createBatch", "CreateBatchOptions" and "tryAdd" (#11887)

This commit is contained in:
Mohsin Mehmood 2020-10-19 13:00:53 +08:00 коммит произвёл GitHub
Родитель 9c1576eba5
Коммит 92affe2c10
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
16 изменённых файлов: 78 добавлений и 72 удалений

Просмотреть файл

@ -9,15 +9,28 @@
and
[PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
- Added new "userId" property to `ServiceBusMessage` interface. [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
- `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
### Breaking changes
- The `createBatch` method on the sender is renamed to `createMesageBatch`
- The interface `CreateBatchOptions` followed by the options that are passed to the `createBatch` method is renamed to `CreateMessageBatchOptions`
- The `tryAdd` method on the message batch object is renamed to `tryAddMessage`
- `ServiceBusMessage` interface updates:
- "properties" renamed to "applicationProperties"
- "label" renamed to "subject"
- `CorrelationRuleFilter` interface updates:
- "properties" renamed to "applicationProperties"
- "label" renamed to "subject"
- `SqlRuleFilter` interface "sqlExpression" changed from optional to required
## 7.0.0-preview.7 (2020-10-07)
- [Bug Fix] `sendMessages` method on the sender would have previously thrown an error for sending a batch or an array of messages upon a network disconnect, the issue has been fixed now.
[PR 11651](https://github.com/Azure/azure-sdk-for-js/pull/11651/commits/f262e4562eb78828ee816a54f9a9778692e0eff9)
- Added new "userId" property to `ServiceBusMessage` interface. [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
- `NamespaceProperties` interface property "messageSku" type changed from "string" to string literal type "Basic" | "Premium" | "Standard". [PR 11810](https://github.com/Azure/azure-sdk-for-js/pull/11810)
### New features:
- Message locks can be auto-renewed in all receive methods (receiver.receiveMessages, receiver.subcribe
@ -36,13 +49,6 @@
- `acceptSession`, which opens a session by name
- `acceptNextSession`, which opens the next available session, determined by Service Bus.
- as part of this `CreateSessionReceiverOptions` has been renamed to `AcceptSessionReceiverOptions` to conform to guidelines.
- `ServiceBusMessage` interface updates:
- "properties" renamed to "applicationProperties"
- "label" renamed to "subject"
- `CorrelationRuleFilter` interface updates:
- "properties" renamed to "applicationProperties"
- "label" renamed to "subject"
- `SqlRuleFilter` interface "sqlExpression" changed from optional to required
## 7.0.0-preview.6 (2020-09-10)

Просмотреть файл

@ -100,7 +100,7 @@ export interface CorrelationRuleFilter {
}
// @public
export interface CreateBatchOptions extends OperationOptionsBase {
export interface CreateMessageBatchOptions extends OperationOptionsBase {
maxSizeInBytes?: number;
}
@ -391,7 +391,7 @@ export interface ServiceBusMessageBatch {
// @internal
readonly _messageSpanContexts: SpanContext[];
readonly sizeInBytes: number;
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;
tryAddMessage(message: ServiceBusMessage, options?: TryAddOptions): boolean;
}
// @public
@ -443,7 +443,7 @@ export interface ServiceBusReceiver<ReceivedMessageT> {
export interface ServiceBusSender {
cancelScheduledMessages(sequenceNumbers: Long | Long[], options?: OperationOptionsBase): Promise<void>;
close(): Promise<void>;
createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch>;
createMessageBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch>;
entityPath: string;
isClosed: boolean;
open(options?: OperationOptionsBase): Promise<void>;

Просмотреть файл

@ -33,7 +33,7 @@ import { LinkEntity } from "./linkEntity";
import { getUniqueName, waitForTimeoutOrAbortOrResolve } from "../util/utils";
import { throwErrorIfConnectionClosed } from "../util/errors";
import { ServiceBusMessageBatch, ServiceBusMessageBatchImpl } from "../serviceBusMessageBatch";
import { CreateBatchOptions } from "../models";
import { CreateMessageBatchOptions } from "../models";
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
import { AbortSignalLike } from "@azure/abort-controller";
@ -508,7 +508,7 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
});
}
async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
async createBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch> {
throwErrorIfConnectionClosed(this._context);
let maxMessageSize = await this.getMaxMessageSize({
retryOptions: this._retryOptions,

Просмотреть файл

@ -17,7 +17,7 @@ export { Delivery, WebSocketImpl } from "rhea-promise";
export { ServiceBusClientOptions } from "./constructorHelpers";
export { CorrelationRuleFilter } from "./core/managementClient";
export {
CreateBatchOptions,
CreateMessageBatchOptions,
CreateReceiverOptions,
AcceptSessionOptions,
GetMessageIteratorOptions,

Просмотреть файл

@ -103,7 +103,7 @@ export interface CreateReceiverOptions<ReceiveModeT extends ReceiveMode> {
* }
* ```
*/
export interface CreateBatchOptions extends OperationOptionsBase {
export interface CreateMessageBatchOptions extends OperationOptionsBase {
/**
* @property
* The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached.

Просмотреть файл

@ -12,7 +12,7 @@ import {
throwTypeErrorIfParameterNotLong
} from "./util/errors";
import { ServiceBusMessageBatch } from "./serviceBusMessageBatch";
import { CreateBatchOptions } from "./models";
import { CreateMessageBatchOptions } from "./models";
import {
MessagingError,
RetryConfig,
@ -62,12 +62,12 @@ export interface ServiceBusSender {
* @param options Configures the behavior of the batch.
* - `maxSizeInBytes`: The upper limit for the size of batch. The `tryAdd` function will return `false` after this limit is reached.
*
* @param {CreateBatchOptions} [options]
* @param {CreateMessageBatchOptions} [options]
* @returns {Promise<ServiceBusMessageBatch>}
* @throws MessagingError if an error is encountered while sending a message.
* @throws Error if the underlying connection or sender has been closed.
*/
createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch>;
createMessageBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch>;
/**
* Opens the AMQP link to Azure Service Bus from the sender.
@ -195,12 +195,12 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
}
let batch: ServiceBusMessageBatch;
if (Array.isArray(messages)) {
batch = await this.createBatch(options);
batch = await this.createMessageBatch(options);
for (const message of messages) {
if (!isServiceBusMessage(message)) {
throw new TypeError(invalidTypeErrMsg);
}
if (!batch.tryAdd(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
if (!batch.tryAddMessage(message, { parentSpan: getParentSpan(options?.tracingOptions) })) {
// this is too big - throw an error
const error = new MessagingError(
"Messages were too big to fit in a single batch. Remove some messages and try again or create your own batch using createBatch(), which gives more fine-grained control."
@ -238,7 +238,7 @@ export class ServiceBusSenderImpl implements ServiceBusSender {
}
}
async createBatch(options?: CreateBatchOptions): Promise<ServiceBusMessageBatch> {
async createMessageBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch> {
this._throwIfSenderOrConnectionClosed();
return this._sender.createBatch(options);
}
@ -366,7 +366,7 @@ export function isServiceBusMessageBatch(
const possibleBatch = messageBatchOrAnything as ServiceBusMessageBatch;
return (
typeof possibleBatch.tryAdd === "function" &&
typeof possibleBatch.tryAddMessage === "function" &&
typeof possibleBatch.maxSizeInBytes === "number" &&
typeof possibleBatch.sizeInBytes === "number"
);

Просмотреть файл

@ -77,7 +77,7 @@ export interface ServiceBusMessageBatch {
* @param message An individual service bus message.
* @returns A boolean value indicating if the message has been added to the batch or not.
*/
tryAdd(message: ServiceBusMessage, options?: TryAddOptions): boolean;
tryAddMessage(message: ServiceBusMessage, options?: TryAddOptions): boolean;
/**
* The AMQP message containing encoded events that were added to the batch.
@ -243,7 +243,7 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
* @param message An individual service bus message.
* @returns A boolean value indicating if the message has been added to the batch or not.
*/
public tryAdd(message: ServiceBusMessage, options: TryAddOptions = {}): boolean {
public tryAddMessage(message: ServiceBusMessage, options: TryAddOptions = {}): boolean {
throwTypeErrorIfParameterMissing(this._context.connectionId, "message", message);
if (!isServiceBusMessage(message)) {
throw new TypeError("Provided value for 'message' must be of type ServiceBusMessage.");

Просмотреть файл

@ -663,9 +663,9 @@ describe("Batching Receiver", () => {
// See https://github.com/Azure/azure-service-bus-node/issues/31
async function testSequentialReceiveBatchCalls(): Promise<void> {
const testMessages = entityNames.usesSessions ? messageWithSessions : messages;
const batchMessageToSend = await sender.createBatch();
const batchMessageToSend = await sender.createMessageBatch();
for (const message of testMessages) {
batchMessageToSend.tryAdd(message);
batchMessageToSend.tryAddMessage(message);
}
await sender.sendMessages(batchMessageToSend);
const msgs1 = await receiver.receiveMessages(1);

Просмотреть файл

@ -50,13 +50,13 @@ describe("sender unit tests", () => {
["hello", {}, null, undefined].forEach((invalidValue) => {
it(`don't allow tryAdd(${invalidValue})`, async () => {
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
let expectedErrorMsg = "Provided value for 'message' must be of type ServiceBusMessage.";
if (invalidValue === null || invalidValue === undefined) {
expectedErrorMsg = `Missing parameter "message"`;
}
try {
batch.tryAdd(
batch.tryAddMessage(
// @ts-expect-error
invalidValue
);

Просмотреть файл

@ -267,15 +267,15 @@ describe("Retries - MessageSender", () => {
it("Unpartitioned Queue: createBatch", async function(): Promise<void> {
await beforeEachTest(TestClientType.UnpartitionedQueue);
await mockInitAndVerifyRetries(async () => {
await sender.createBatch();
await sender.createMessageBatch();
});
});
it("Unpartitioned Queue: sendBatch", async function(): Promise<void> {
await beforeEachTest(TestClientType.UnpartitionedQueue);
await mockInitAndVerifyRetries(async () => {
const batch = await sender.createBatch();
batch.tryAdd({
const batch = await sender.createMessageBatch();
batch.tryAddMessage({
body: "hello"
});
await sender.sendMessages(batch);
@ -292,15 +292,15 @@ describe("Retries - MessageSender", () => {
it("Unpartitioned Queue with Sessions: createBatch", async function(): Promise<void> {
await beforeEachTest(TestClientType.UnpartitionedQueue);
await mockInitAndVerifyRetries(async () => {
await sender.createBatch();
await sender.createMessageBatch();
});
});
it("Unpartitioned Queue with Sessions: sendBatch", async function(): Promise<void> {
await beforeEachTest(TestClientType.UnpartitionedQueue);
await mockInitAndVerifyRetries(async () => {
const batch = await sender.createBatch();
batch.tryAdd({
const batch = await sender.createMessageBatch();
batch.tryAddMessage({
body: "hello"
});
await sender.sendMessages(batch);

Просмотреть файл

@ -558,10 +558,10 @@ describe("Tracing for send", function(): void {
const list = [{ name: "Albert" }, { name: "Marie" }];
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
for (let i = 0; i < 2; i++) {
batch.tryAdd({ body: `${list[i].name}` }, { parentSpan: rootSpan });
batch.tryAddMessage({ body: `${list[i].name}` }, { parentSpan: rootSpan });
}
await sender.sendMessages(batch);
rootSpan.end();
@ -610,10 +610,10 @@ describe("Tracing for send", function(): void {
}
];
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
for (let i = 0; i < 2; i++) {
batch.tryAdd(
batch.tryAddMessage(
{ body: `${list[i].name}`, applicationProperties: list[i].applicationProperties },
{ parentSpan: rootSpan }
);
@ -651,9 +651,9 @@ describe("Tracing for send", function(): void {
const list = [{ name: "Albert" }, { name: "Marie" }];
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
for (let i = 0; i < 2; i++) {
batch.tryAdd({ body: `${list[i].name}` }, { parentSpan: rootSpan });
batch.tryAddMessage({ body: `${list[i].name}` }, { parentSpan: rootSpan });
}
await sender.sendMessages(batch, {
tracingOptions: {

Просмотреть файл

@ -69,10 +69,10 @@ describe("Send Batch", () => {
// Prepare messages to send
const messagesToSend = prepareMessages();
const sentMessages: ServiceBusMessage[] = [];
const batchMessage = await sender.createBatch({ maxSizeInBytes });
const batchMessage = await sender.createMessageBatch({ maxSizeInBytes });
for (const messageToSend of messagesToSend) {
const batchHasCapacity = batchMessage.tryAdd(messageToSend);
const batchHasCapacity = batchMessage.tryAddMessage(messageToSend);
if (!batchHasCapacity) {
break;
} else {
@ -106,7 +106,7 @@ describe("Send Batch", () => {
): Promise<void> {
// Prepare messages to send
const sentMessages: ServiceBusMessage[] = [];
const batchMessage = await sender.createBatch({ maxSizeInBytes });
const batchMessage = await sender.createMessageBatch({ maxSizeInBytes });
// Size of each message will be > 20000 bytes, maxMessageSize/20000 would exceed the limit
const numberOfMessagesToSend =
@ -119,7 +119,7 @@ describe("Send Batch", () => {
sessionId: entityNames.usesSessions ? `someSession ${i}` : undefined,
partitionKey: entityNames.usesSessions ? `someSession ${i}` : undefined
};
const batchHasCapacity = batchMessage.tryAdd(messageToSend);
const batchHasCapacity = batchMessage.tryAddMessage(messageToSend);
if (!batchHasCapacity) {
break;
} else {
@ -187,10 +187,10 @@ describe("Send Batch", () => {
// Prepare messages to send
const messagesToSend = prepareMessages();
const sentMessages: ServiceBusMessage[] = [];
const batchMessage = await sender.createBatch({ maxSizeInBytes });
const batchMessage = await sender.createMessageBatch({ maxSizeInBytes });
for (const messageToSend of messagesToSend) {
const batchHasCapacity = batchMessage.tryAdd(messageToSend);
const batchHasCapacity = batchMessage.tryAddMessage(messageToSend);
if (!batchHasCapacity) {
break;
} else {
@ -232,10 +232,10 @@ describe("Send Batch", () => {
// Prepare messages to send
const messagesToSend = prepareMessages(entityNames.usesSessions);
const sentMessages: ServiceBusMessage[] = [];
const batchMessage = await sender.createBatch();
const batchMessage = await sender.createMessageBatch();
for (const messageToSend of messagesToSend) {
const batchHasCapacity = batchMessage.tryAdd(messageToSend);
const batchHasCapacity = batchMessage.tryAddMessage(messageToSend);
if (!batchHasCapacity) {
break;
} else {
@ -296,10 +296,10 @@ describe("Send Batch", () => {
// Prepare messages to send
const messagesToSend = prepareMessages(useSessions);
const sentMessages: ServiceBusMessage[] = [];
const batchMessage = await sender.createBatch({ maxSizeInBytes });
const batchMessage = await sender.createMessageBatch({ maxSizeInBytes });
for (const messageToSend of messagesToSend) {
const batchHasCapacity = batchMessage.tryAdd(messageToSend);
const batchHasCapacity = batchMessage.tryAddMessage(messageToSend);
if (!batchHasCapacity) {
break;
} else {
@ -358,25 +358,25 @@ describe("Send Batch", () => {
): Promise<void> {
// Prepare messages to send
const messagesToSend = prepareMessages(entityNames.usesSessions);
const batchMessage = await sender.createBatch({ maxSizeInBytes });
const batchMessage = await sender.createMessageBatch({ maxSizeInBytes });
should.equal(
batchMessage.tryAdd(messagesToSend[0]),
batchMessage.tryAddMessage(messagesToSend[0]),
true,
"tryAdd should not have failed for the first message"
);
should.equal(
batchMessage.tryAdd(messagesToSend[1]),
batchMessage.tryAddMessage(messagesToSend[1]),
false,
"tryAdd should have failed for the second message"
);
should.equal(
batchMessage.tryAdd(messagesToSend[2]),
batchMessage.tryAddMessage(messagesToSend[2]),
false,
"tryAdd should have failed for the third message"
);
should.equal(
batchMessage.tryAdd(messagesToSend[3]),
batchMessage.tryAddMessage(messagesToSend[3]),
false,
"tryAdd should have failed for the fourth message"
);
@ -405,7 +405,7 @@ describe("Send Batch", () => {
async function testSendBatch(maxSizeInBytes?: number): Promise<void> {
let errorIsThrown = false;
try {
await sender.createBatch({ maxSizeInBytes });
await sender.createMessageBatch({ maxSizeInBytes });
} catch (error) {
const maxSize = await (sender as ServiceBusSenderImpl)["_sender"].getMaxMessageSize();
should.equal(
@ -460,10 +460,10 @@ describe("Send Batch", () => {
await beforeEachTest(TestClientType.UnpartitionedQueue);
const messagesToSend = [{ body: "hello" }];
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
for (const message of messagesToSend) {
if (!batch.tryAdd(message)) {
if (!batch.tryAddMessage(message)) {
throw new Error("We do actually want to send all the events.");
}
}

Просмотреть файл

@ -458,7 +458,7 @@ describe("Errors after close()", function(): void {
should.equal(errorSend, expectedErrorMsg, "Expected error not thrown for sendMessages()");
let errorCreateBatch: string = "";
await sender.createBatch().catch((err) => {
await sender.createMessageBatch().catch((err) => {
errorCreateBatch = err.message;
});
should.equal(errorCreateBatch, expectedErrorMsg, "Expected error not thrown for createBatch()");

Просмотреть файл

@ -448,8 +448,8 @@ describe("Sample scenarios for track 2", () => {
break;
}
case "batch": {
const batch = await sender.createBatch();
assert.isTrue(batch.tryAdd(message));
const batch = await sender.createMessageBatch();
assert.isTrue(batch.tryAddMessage(message));
await sender.sendMessages(batch);
break;
}

Просмотреть файл

@ -761,9 +761,9 @@ describe("Streaming Receiver Tests", () => {
async function testConcurrency(maxConcurrentCalls?: number): Promise<void> {
const testMessages = [TestMessage.getSample(), TestMessage.getSample()];
const batchMessageToSend = await sender.createBatch();
const batchMessageToSend = await sender.createMessageBatch();
testMessages.forEach((message) => {
batchMessageToSend.tryAdd(message);
batchMessageToSend.tryAddMessage(message);
});
await sender.sendMessages(batchMessageToSend);
@ -820,7 +820,7 @@ describe("Streaming Receiver Tests", () => {
const totalNumOfMessages = 5;
let num = 1;
const messages = [];
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
while (num <= totalNumOfMessages) {
const message = {
messageId: num,
@ -830,7 +830,7 @@ describe("Streaming Receiver Tests", () => {
};
num++;
messages.push(message);
batch.tryAdd(message);
batch.tryAddMessage(message);
}
await sender.sendMessages(batch);

Просмотреть файл

@ -685,9 +685,9 @@ describe("Streaming with sessions", () => {
}
const testMessages = [TestMessage.getSessionSample(), TestMessage.getSessionSample()];
const batchMessageToSend = await sender.createBatch();
const batchMessageToSend = await sender.createMessageBatch();
for (const message of testMessages) {
batchMessageToSend.tryAdd(message);
batchMessageToSend.tryAddMessage(message);
}
await sender.sendMessages(batchMessageToSend);
@ -754,7 +754,7 @@ describe("Streaming with sessions", () => {
const totalNumOfMessages = 5;
let num = 1;
const messages = [];
const batch = await sender.createBatch();
const batch = await sender.createMessageBatch();
while (num <= totalNumOfMessages) {
const message = {
messageId: num,
@ -765,7 +765,7 @@ describe("Streaming with sessions", () => {
};
num++;
messages.push(message);
batch.tryAdd(message);
batch.tryAddMessage(message);
}
await sender.sendMessages(batch);