[Service Bus] autoComplete -> autoCompleteMessages (#12558)
This commit is contained in:
Родитель
e8b73af38f
Коммит
993c7bcb69
|
@ -488,7 +488,7 @@ export interface SqlRuleFilter {
|
|||
|
||||
// @public
|
||||
export interface SubscribeOptions extends OperationOptionsBase {
|
||||
autoComplete?: boolean;
|
||||
autoCompleteMessages?: boolean;
|
||||
maxConcurrentCalls?: number;
|
||||
}
|
||||
|
||||
|
|
|
@ -111,9 +111,9 @@ async function receiveMessage() {
|
|||
receiver.subscribe(
|
||||
{ processMessage, processError },
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
); // Disabling autoComplete so we can control when message can be completed, deferred or deadlettered
|
||||
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
|
||||
await delay(10000);
|
||||
await receiver.close();
|
||||
console.log("Total number of deferred messages:", deferredSteps.size);
|
||||
|
|
|
@ -30,18 +30,15 @@ async function main() {
|
|||
|
||||
try {
|
||||
const subscription = receiver.subscribe({
|
||||
// After executing this callback you provide, the receiver will remove the message from the queue if you
|
||||
// have not already settled the message in your callback.
|
||||
// You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method.
|
||||
// If your callback _does_ throw an error before the message is settled, then it will be abandoned.
|
||||
processMessage: async (brokeredMessage) => {
|
||||
console.log(`Received message: ${brokeredMessage.body}`);
|
||||
|
||||
// autoComplete, which is enabled by default, will automatically call
|
||||
// receiver.completeMessage() on your message after awaiting on your processMessage
|
||||
// handler so long as your handler does not throw an error.
|
||||
//
|
||||
// If your handler _does_ throw an error then the message will automatically
|
||||
// be abandoned using receiver.abandonMessage()
|
||||
//
|
||||
// autoComplete can be disabled in the options for subscribe().
|
||||
},
|
||||
// This callback will be called for any error that occurs when either in the receiver when receiving the message
|
||||
// or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message.
|
||||
processError: async (args) => {
|
||||
console.log(`Error from source ${args.errorSource} occurred: `, args.error);
|
||||
|
||||
|
|
|
@ -117,9 +117,9 @@ async function receiveMessage() {
|
|||
receiver.subscribe(
|
||||
{ processMessage, processError },
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
); // Disabling autoComplete so we can control when message can be completed, deferred or deadlettered
|
||||
); // Disabling autoCompleteMessages so we can control when message can be completed, deferred or deadlettered
|
||||
await delay(10000);
|
||||
await receiver.close();
|
||||
console.log("Total number of deferred messages:", deferredSteps.size);
|
||||
|
|
|
@ -38,18 +38,15 @@ export async function main() {
|
|||
|
||||
try {
|
||||
const subscription = receiver.subscribe({
|
||||
// After executing this callback you provide, the receiver will remove the message from the queue if you
|
||||
// have not already settled the message in your callback.
|
||||
// You can disable this by passing `false` to the `autoCompleteMessages` option in the `subscribe()` method.
|
||||
// If your callback _does_ throw an error before the message is settled, then it will be abandoned.
|
||||
processMessage: async (brokeredMessage: ServiceBusReceivedMessage) => {
|
||||
console.log(`Received message: ${brokeredMessage.body}`);
|
||||
|
||||
// autoComplete, which is enabled by default, will automatically call
|
||||
// receiver.completeMessage() on your message after awaiting on your processMessage
|
||||
// handler so long as your handler does not throw an error.
|
||||
//
|
||||
// If your handler _does_ throw an error then the message will automatically
|
||||
// be abandoned using receiver.abandonMessage()
|
||||
//
|
||||
// autoComplete can be disabled in the options for subscribe().
|
||||
},
|
||||
// This callback will be called for any error that occurs when either in the receiver when receiving the message
|
||||
// or when executing your `processMessage` callback or when the receiver automatically completes or abandons the message.
|
||||
processError: async (args: ProcessErrorArgs) => {
|
||||
console.log(`Error from source ${args.errorSource} occurred: `, args.error);
|
||||
|
||||
|
|
|
@ -162,7 +162,8 @@ export abstract class MessageReceiver extends LinkEntity<Receiver> {
|
|||
this.receiveMode = options.receiveMode || "peekLock";
|
||||
|
||||
// If explicitly set to false then autoComplete is false else true (default).
|
||||
this.autoComplete = options.autoComplete === false ? options.autoComplete : true;
|
||||
this.autoComplete =
|
||||
options.autoCompleteMessages === false ? options.autoCompleteMessages : true;
|
||||
this._lockRenewer = options.lockRenewer;
|
||||
}
|
||||
|
||||
|
|
|
@ -154,12 +154,16 @@ export interface GetMessageIteratorOptions extends OperationOptionsBase {}
|
|||
*/
|
||||
export interface SubscribeOptions extends OperationOptionsBase {
|
||||
/**
|
||||
* @property Indicates whether the `complete()` method on the message should automatically be
|
||||
* called by the sdk after the user provided onMessage handler has been executed.
|
||||
* Calling `complete()` on a message removes it from the Queue/Subscription.
|
||||
* @property Indicates whether the message should be settled using the `completeMessage()`
|
||||
* method on the receiver automatically after it executes the user provided message callback.
|
||||
* Doing so removes the message from the queue/subscription.
|
||||
*
|
||||
* This option is ignored if messages are received in the `receiveAndDelete` receive mode or if
|
||||
* the message is already settled in the user provided message callback.
|
||||
*
|
||||
* - **Default**: `true`.
|
||||
*/
|
||||
autoComplete?: boolean;
|
||||
autoCompleteMessages?: boolean;
|
||||
/**
|
||||
* @property The maximum number of concurrent calls that the library
|
||||
* can make to the user's message handler. Once this limit has been reached, more messages will
|
||||
|
|
|
@ -425,7 +425,7 @@ export class ServiceBusReceiverImpl implements ServiceBusReceiver {
|
|||
options: StreamingReceiverInitArgs
|
||||
): Promise<StreamingReceiver> {
|
||||
throwErrorIfConnectionClosed(this._context);
|
||||
if (options.autoComplete == null) options.autoComplete = true;
|
||||
if (options.autoCompleteMessages == null) options.autoCompleteMessages = true;
|
||||
|
||||
// When the user "stops" a streaming receiver (via the returned instance from 'subscribe' we just suspend
|
||||
// it, leaving the link open). This allows users to stop the flow of messages but still be able to settle messages
|
||||
|
|
|
@ -603,7 +603,8 @@ export class MessageSession extends LinkEntity<Receiver> {
|
|||
}
|
||||
|
||||
// If explicitly set to false then autoComplete is false else true (default).
|
||||
this.autoComplete = options.autoComplete === false ? options.autoComplete : true;
|
||||
this.autoComplete =
|
||||
options.autoCompleteMessages === false ? options.autoCompleteMessages : true;
|
||||
this._onMessage = onMessage;
|
||||
this._onError = onError;
|
||||
|
||||
|
|
|
@ -145,7 +145,7 @@ describe("receive and delete", () => {
|
|||
errors.push(args.error.message);
|
||||
}
|
||||
},
|
||||
{ autoComplete: autoCompleteFlag }
|
||||
{ autoCompleteMessages: autoCompleteFlag }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
|
|
|
@ -301,7 +301,7 @@ describe("Message Lock Renewal", () => {
|
|||
receiver.subscribe(
|
||||
{ processMessage, processError },
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
await delay(10000);
|
||||
|
@ -387,7 +387,7 @@ describe("Message Lock Renewal", () => {
|
|||
processError: async (err) => reject(err)
|
||||
},
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
});
|
||||
|
|
|
@ -275,7 +275,7 @@ describe("Session Lock Renewal", () => {
|
|||
receiver.subscribe(
|
||||
{ processMessage, processError },
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
await delay(10000);
|
||||
|
@ -341,7 +341,7 @@ describe("Session Lock Renewal", () => {
|
|||
}
|
||||
},
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
|
||||
|
|
|
@ -192,7 +192,7 @@ describe("session tests", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete: false }
|
||||
{ autoCompleteMessages: false }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
|
|
|
@ -136,7 +136,7 @@ describe("Streaming Receiver Tests", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete: false }
|
||||
{ autoCompleteMessages: false }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
|
@ -231,7 +231,7 @@ describe("Streaming Receiver Tests", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
|
@ -278,7 +278,7 @@ describe("Streaming Receiver Tests", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete: false }
|
||||
{ autoCompleteMessages: false }
|
||||
);
|
||||
|
||||
const deliveryCountFlag = await checkWithTimeout(
|
||||
|
@ -332,7 +332,7 @@ describe("Streaming Receiver Tests", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
|
||||
should.equal(
|
||||
|
@ -397,7 +397,7 @@ describe("Streaming Receiver Tests", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
should.equal(msgsCheck, true, `Expected 1, received ${receivedMsgs.length} messages`);
|
||||
|
@ -782,7 +782,7 @@ describe("Streaming Receiver Tests", () => {
|
|||
processError
|
||||
},
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
await receiver.close();
|
||||
|
@ -1040,7 +1040,7 @@ export function singleMessagePromise(
|
|||
}
|
||||
},
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
});
|
||||
|
|
|
@ -209,7 +209,7 @@ describe("Streaming with sessions", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete: false }
|
||||
{ autoCompleteMessages: false }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
|
@ -253,7 +253,7 @@ describe("Streaming with sessions", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => receivedMsgs.length === 1);
|
||||
|
@ -304,7 +304,7 @@ describe("Streaming with sessions", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
|
||||
const msgAbandonCheck = await checkWithTimeout(() => abandonFlag === 1);
|
||||
|
@ -364,7 +364,7 @@ describe("Streaming with sessions", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
|
||||
const sequenceNumCheck = await checkWithTimeout(() => sequenceNum !== 0);
|
||||
|
@ -425,7 +425,7 @@ describe("Streaming with sessions", () => {
|
|||
},
|
||||
processError
|
||||
},
|
||||
{ autoComplete }
|
||||
{ autoCompleteMessages: autoComplete }
|
||||
);
|
||||
|
||||
const msgsCheck = await checkWithTimeout(() => msgCount === 1);
|
||||
|
@ -764,7 +764,7 @@ describe("Streaming with sessions", () => {
|
|||
processError
|
||||
},
|
||||
{
|
||||
autoComplete: false
|
||||
autoCompleteMessages: false
|
||||
}
|
||||
);
|
||||
await receiver.close();
|
||||
|
|
Загрузка…
Ссылка в новой задаче