[Service Bus] Promise executor functions should not be async (#13818)
This commit is contained in:
Родитель
f931f1b646
Коммит
65b209a32c
|
@ -67,7 +67,11 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
|
|||
private _onSessionClose: OnAmqpEvent;
|
||||
private _retryOptions: RetryOptions;
|
||||
|
||||
constructor(connectionContext: ConnectionContext, entityPath: string, retryOptions: RetryOptions) {
|
||||
constructor(
|
||||
connectionContext: ConnectionContext,
|
||||
entityPath: string,
|
||||
retryOptions: RetryOptions
|
||||
) {
|
||||
super(entityPath, entityPath, connectionContext, "sender", logger, {
|
||||
address: entityPath,
|
||||
audience: `${connectionContext.config.endpoint}${entityPath}`
|
||||
|
@ -170,112 +174,104 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
|
|||
? Constants.defaultOperationTimeoutInMs
|
||||
: this._retryOptions.timeoutInMs;
|
||||
|
||||
const sendEventPromise = () =>
|
||||
new Promise<void>(async (resolve, reject) => {
|
||||
const initStartTime = Date.now();
|
||||
if (!this.isOpen()) {
|
||||
try {
|
||||
await waitForTimeoutOrAbortOrResolve({
|
||||
actionFn: () => this.open(undefined, options?.abortSignal),
|
||||
abortSignal: options?.abortSignal,
|
||||
timeoutMs: timeoutInMs,
|
||||
timeoutMessage:
|
||||
`[${this._context.connectionId}] Sender "${this.name}" ` +
|
||||
`with address "${this.address}", was not able to send the message right now, due ` +
|
||||
`to operation timeout.`
|
||||
});
|
||||
} catch (err) {
|
||||
err = translateServiceBusError(err);
|
||||
logger.logError(
|
||||
err,
|
||||
"%s An error occurred while creating the sender",
|
||||
this.logPrefix,
|
||||
this.name
|
||||
);
|
||||
return reject(err);
|
||||
}
|
||||
}
|
||||
|
||||
const sendEventPromise = async (): Promise<void> => {
|
||||
const initStartTime = Date.now();
|
||||
if (!this.isOpen()) {
|
||||
try {
|
||||
const timeTakenByInit = Date.now() - initStartTime;
|
||||
|
||||
logger.verbose(
|
||||
"%s Sender '%s', credit: %d available: %d",
|
||||
this.logPrefix,
|
||||
this.name,
|
||||
this.link?.credit,
|
||||
this.link?.session?.outgoing?.available()
|
||||
);
|
||||
|
||||
if (!this.link?.sendable()) {
|
||||
logger.verbose(
|
||||
"%s Sender '%s', waiting for 1 second for sender to become sendable",
|
||||
this.logPrefix,
|
||||
this.name
|
||||
);
|
||||
|
||||
await delay(1000);
|
||||
|
||||
logger.verbose(
|
||||
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
|
||||
this.logPrefix,
|
||||
this.name,
|
||||
this.link?.credit,
|
||||
this.link?.session?.outgoing?.available()
|
||||
);
|
||||
}
|
||||
if (this.link?.sendable()) {
|
||||
if (timeoutInMs <= timeTakenByInit) {
|
||||
const desc: string =
|
||||
`${this.logPrefix} Sender "${this.name}" ` +
|
||||
`with address "${this.address}", was not able to send the message right now, due ` +
|
||||
`to operation timeout.`;
|
||||
logger.warning(desc);
|
||||
const e: AmqpError = {
|
||||
condition: ErrorNameConditionMapper.ServiceUnavailableError,
|
||||
description: desc
|
||||
};
|
||||
return reject(translateServiceBusError(e));
|
||||
}
|
||||
try {
|
||||
this.link.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
|
||||
const delivery = await this.link!.send(
|
||||
encodedMessage,
|
||||
undefined,
|
||||
sendBatch ? 0x80013700 : 0
|
||||
);
|
||||
logger.verbose(
|
||||
"%s Sender '%s', sent message with delivery id: %d",
|
||||
this.logPrefix,
|
||||
this.name,
|
||||
delivery.id
|
||||
);
|
||||
return resolve();
|
||||
} catch (error) {
|
||||
error = translateServiceBusError(error.innerError || error);
|
||||
logger.logError(
|
||||
error,
|
||||
`${this.logPrefix} An error occurred while sending the message`
|
||||
);
|
||||
return reject(error);
|
||||
}
|
||||
} else {
|
||||
// let us retry to send the message after some time.
|
||||
const msg =
|
||||
`[${this.logPrefix}] Sender "${this.name}", ` +
|
||||
`cannot send the message right now. Please try later.`;
|
||||
logger.warning(msg);
|
||||
const amqpError: AmqpError = {
|
||||
condition: ErrorNameConditionMapper.SenderBusyError,
|
||||
description: msg
|
||||
};
|
||||
reject(translateServiceBusError(amqpError));
|
||||
}
|
||||
await waitForTimeoutOrAbortOrResolve({
|
||||
actionFn: () => this.open(undefined, options?.abortSignal),
|
||||
abortSignal: options?.abortSignal,
|
||||
timeoutMs: timeoutInMs,
|
||||
timeoutMessage:
|
||||
`[${this._context.connectionId}] Sender "${this.name}" ` +
|
||||
`with address "${this.address}", was not able to send the message right now, due ` +
|
||||
`to operation timeout.`
|
||||
});
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
err = translateServiceBusError(err);
|
||||
logger.logError(
|
||||
err,
|
||||
"%s An error occurred while creating the sender",
|
||||
this.logPrefix,
|
||||
this.name
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const timeTakenByInit = Date.now() - initStartTime;
|
||||
|
||||
logger.verbose(
|
||||
"%s Sender '%s', credit: %d available: %d",
|
||||
this.logPrefix,
|
||||
this.name,
|
||||
this.link?.credit,
|
||||
this.link?.session?.outgoing?.available()
|
||||
);
|
||||
|
||||
if (!this.link?.sendable()) {
|
||||
logger.verbose(
|
||||
"%s Sender '%s', waiting for 1 second for sender to become sendable",
|
||||
this.logPrefix,
|
||||
this.name
|
||||
);
|
||||
|
||||
await delay(1000);
|
||||
|
||||
logger.verbose(
|
||||
"%s Sender '%s' after waiting for a second, credit: %d available: %d",
|
||||
this.logPrefix,
|
||||
this.name,
|
||||
this.link?.credit,
|
||||
this.link?.session?.outgoing?.available()
|
||||
);
|
||||
}
|
||||
|
||||
if (!this.link?.sendable()) {
|
||||
// let us retry to send the message after some time.
|
||||
const msg =
|
||||
`[${this.logPrefix}] Sender "${this.name}", ` +
|
||||
`cannot send the message right now. Please try later.`;
|
||||
logger.warning(msg);
|
||||
const amqpError: AmqpError = {
|
||||
condition: ErrorNameConditionMapper.SenderBusyError,
|
||||
description: msg
|
||||
};
|
||||
throw translateServiceBusError(amqpError);
|
||||
}
|
||||
|
||||
if (timeoutInMs <= timeTakenByInit) {
|
||||
const desc: string =
|
||||
`${this.logPrefix} Sender "${this.name}" ` +
|
||||
`with address "${this.address}", was not able to send the message right now, due ` +
|
||||
`to operation timeout.`;
|
||||
logger.warning(desc);
|
||||
const e: AmqpError = {
|
||||
condition: ErrorNameConditionMapper.ServiceUnavailableError,
|
||||
description: desc
|
||||
};
|
||||
throw translateServiceBusError(e);
|
||||
}
|
||||
|
||||
try {
|
||||
this.link.sendTimeoutInSeconds = (timeoutInMs - timeTakenByInit) / 1000;
|
||||
const delivery = await this.link!.send(
|
||||
encodedMessage,
|
||||
undefined,
|
||||
sendBatch ? 0x80013700 : 0
|
||||
);
|
||||
logger.verbose(
|
||||
"%s Sender '%s', sent message with delivery id: %d",
|
||||
this.logPrefix,
|
||||
this.name,
|
||||
delivery.id
|
||||
);
|
||||
} catch (error) {
|
||||
error = translateServiceBusError(error.innerError || error);
|
||||
logger.logError(error, `${this.logPrefix} An error occurred while sending the message`);
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
const config: RetryConfig<void> = {
|
||||
operation: sendEventPromise,
|
||||
connectionId: this._context.connectionId!,
|
||||
|
@ -481,23 +477,18 @@ export class MessageSender extends LinkEntity<AwaitableSender> {
|
|||
if (this.isOpen()) {
|
||||
return this.link!.maxMessageSize;
|
||||
}
|
||||
return new Promise<number>(async (resolve, reject) => {
|
||||
try {
|
||||
const config: RetryConfig<void> = {
|
||||
operation: () => this.open(undefined, options?.abortSignal),
|
||||
connectionId: this._context.connectionId,
|
||||
operationType: RetryOperationType.senderLink,
|
||||
retryOptions: retryOptions,
|
||||
abortSignal: options?.abortSignal
|
||||
};
|
||||
|
||||
await retry<void>(config);
|
||||
const config: RetryConfig<void> = {
|
||||
operation: () => this.open(undefined, options?.abortSignal),
|
||||
connectionId: this._context.connectionId,
|
||||
operationType: RetryOperationType.senderLink,
|
||||
retryOptions: retryOptions,
|
||||
abortSignal: options?.abortSignal
|
||||
};
|
||||
|
||||
return resolve(this.link!.maxMessageSize);
|
||||
} catch (err) {
|
||||
reject(err);
|
||||
}
|
||||
});
|
||||
await retry<void>(config);
|
||||
|
||||
return this.link!.maxMessageSize;
|
||||
}
|
||||
|
||||
async createBatch(options?: CreateMessageBatchOptions): Promise<ServiceBusMessageBatch> {
|
||||
|
|
Загрузка…
Ссылка в новой задаче