Remove Interface Search Indexing Buffer (#13405)

* Remove Interface Soe Search Indexing Buffer

* Update sdk/search/search-documents/samples/typescript/src/bufferedSender/uploadDocuments/autoFlushSizeBased.ts

Co-authored-by: Jeff Fisher <xirzec@xirzec.com>

* Response to PR Comments

* Update Samples

* Minor fix

* Added await

* Minor Fix

Co-authored-by: Jeff Fisher <xirzec@xirzec.com>
This commit is contained in:
Sarangan Rajamanickam 2021-01-27 15:49:44 -08:00 коммит произвёл GitHub
Родитель 7edff06f05
Коммит 64e6844090
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
14 изменённых файлов: 405 добавлений и 503 удалений

Просмотреть файл

@ -455,7 +455,10 @@ export interface FreshnessScoringParameters {
// @public
export class GeographyPoint {
constructor(latitude: number, longitude: number);
constructor(geographyPoint: {
longitude: number;
latitude: number;
});
latitude: number;
longitude: number;
toJSON(): Record<string, unknown>;
@ -529,6 +532,11 @@ export class IndexDocumentsBatch<T> {
upload(documents: T[]): void;
}
// @public
export interface IndexDocumentsClient<T> {
indexDocuments(batch: IndexDocumentsBatch<T>, options: IndexDocumentsOptions): Promise<IndexDocumentsResult>;
}
// @public
export interface IndexDocumentsOptions extends OperationOptions {
throwOnAnyFailure?: boolean;
@ -1437,7 +1445,7 @@ export interface ScoringProfile {
export type ScoringStatistics = "local" | "global";
// @public
export class SearchClient<T> {
export class SearchClient<T> implements IndexDocumentsClient<T> {
constructor(endpoint: string, indexName: string, credential: KeyCredential, options?: SearchClientOptions);
readonly apiVersion: string;
autocomplete<Fields extends keyof T>(searchText: string, suggesterName: string, options?: AutocompleteOptions<Fields>): Promise<AutocompleteResult>;
@ -1446,7 +1454,6 @@ export class SearchClient<T> {
readonly endpoint: string;
getDocument<Fields extends keyof T>(key: string, options?: GetDocumentOptions<Fields>): Promise<T>;
getDocumentsCount(options?: CountDocumentsOptions): Promise<number>;
getSearchIndexingBufferedSenderInstance(options?: SearchIndexingBufferedSenderOptions): SearchIndexingBufferedSender<T>;
indexDocuments(batch: IndexDocumentsBatch<T>, options?: IndexDocumentsOptions): Promise<IndexDocumentsResult>;
readonly indexName: string;
mergeDocuments(documents: T[], options?: MergeDocumentsOptions): Promise<IndexDocumentsResult>;
@ -1645,7 +1652,8 @@ export interface SearchIndexerWarning {
}
// @public
export interface SearchIndexingBufferedSender<T> {
export class SearchIndexingBufferedSender<T> {
constructor(client: IndexDocumentsClient<T>, options?: SearchIndexingBufferedSenderOptions);
deleteDocuments(documents: T[], options?: SearchIndexingBufferedSenderDeleteDocumentsOptions): Promise<void>;
dispose(): Promise<void>;
flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise<void>;
@ -1685,9 +1693,9 @@ export interface SearchIndexingBufferedSenderOptions {
autoFlush?: boolean;
flushWindowInMs?: number;
initialBatchActionCount?: number;
maxRetries?: number;
maxRetryDelayInMs?: number;
retryDelayInMs?: number;
maxRetriesPerAction?: number;
maxThrottlingDelayInMs?: number;
throttlingDelayInMs?: number;
}
// @public

Просмотреть файл

@ -42,7 +42,10 @@ function getDocumentsArray(size: number): Hotel[] {
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 47.678581
})
});
}
return array;
@ -62,11 +65,9 @@ export async function main() {
await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);
const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
{
autoFlush: true
}
);
const bufferedClient = new SearchIndexingBufferedSender<Hotel>(searchClient, {
autoFlush: true
});
bufferedClient.on("batchAdded", (response: any) => {
console.log(`Batch Added Event has been receieved: ${response}`);

Просмотреть файл

@ -37,7 +37,8 @@ export async function main() {
await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);
const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
const bufferedClient: SearchIndexingBufferedSender<Hotel> = new SearchIndexingBufferedSender(
searchClient,
{
autoFlush: true
}
@ -79,7 +80,10 @@ export async function main() {
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 47.678581
})
}
]);

Просмотреть файл

@ -34,7 +34,8 @@ export async function main() {
await createIndex(indexClient, TEST_INDEX_NAME);
await delay(WAIT_TIME);
const bufferedClient: SearchIndexingBufferedSender<Hotel> = searchClient.getSearchIndexingBufferedSenderInstance(
const bufferedClient: SearchIndexingBufferedSender<Hotel> = new SearchIndexingBufferedSender(
searchClient,
{
autoFlush: false
}
@ -76,7 +77,10 @@ export async function main() {
parkingIncluded: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 47.678581
})
}
]);

Просмотреть файл

@ -19,12 +19,11 @@ export default class GeographyPoint {
/**
* Constructs a new instance of GeographyPoint given
* the specified coordinates.
* @param latitude - latitude value in decimal
* @param longitude - longitude value in decimal
* @param geographyPoint - object with longitude and latitude values in decimal
*/
constructor(latitude: number, longitude: number) {
this.latitude = latitude;
this.longitude = longitude;
constructor(geographyPoint: { longitude: number; latitude: number }) {
this.longitude = geographyPoint.longitude;
this.latitude = geographyPoint.latitude;
}
/**

Просмотреть файл

@ -6,7 +6,7 @@ export {
DEFAULT_BATCH_SIZE,
DEFAULT_FLUSH_WINDOW,
DEFAULT_RETRY_COUNT
} from "./searchIndexingBufferedSenderImpl";
} from "./searchIndexingBufferedSender";
export {
AutocompleteRequest,
AutocompleteOptions,
@ -38,7 +38,7 @@ export {
SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions,
SearchIndexingBufferedSenderUploadDocumentsOptions
} from "./indexModels";
export { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender";
export { SearchIndexingBufferedSender, IndexDocumentsClient } from "./searchIndexingBufferedSender";
export { SearchIndexClient, SearchIndexClientOptions } from "./searchIndexClient";
export { SearchIndexerClient, SearchIndexerClientOptions } from "./searchIndexerClient";
export {

Просмотреть файл

@ -54,15 +54,15 @@ export interface SearchIndexingBufferedSenderOptions {
/**
* Maximum number of Retries
*/
maxRetries?: number;
maxRetriesPerAction?: number;
/**
* Delay between retries
*/
retryDelayInMs?: number;
throttlingDelayInMs?: number;
/**
* Max Delay between retries
*/
maxRetryDelayInMs?: number;
maxThrottlingDelayInMs?: number;
}
/**

Просмотреть файл

@ -41,15 +41,13 @@ import {
DeleteDocumentsOptions,
SearchDocumentsPageResult,
MergeOrUploadDocumentsOptions,
SearchRequest,
SearchIndexingBufferedSenderOptions
SearchRequest
} from "./indexModels";
import { odataMetadataPolicy } from "./odataMetadataPolicy";
import { IndexDocumentsBatch } from "./indexDocumentsBatch";
import { encode, decode } from "./base64";
import * as utils from "./serviceUtils";
import { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender";
import { createSearchIndexingBufferedSender } from "./searchIndexingBufferedSenderImpl";
import { IndexDocumentsClient } from "./searchIndexingBufferedSender";
/**
* Client options used to configure Cognitive Search API requests.
*/
@ -60,7 +58,7 @@ export type SearchClientOptions = PipelineOptions;
* including querying documents in the index as well as
* adding, updating, and removing them.
*/
export class SearchClient<T> {
export class SearchClient<T> implements IndexDocumentsClient<T> {
/// Maintenance note: when updating supported API versions,
/// the ContinuationToken logic will need to be updated below.
@ -618,17 +616,6 @@ export class SearchClient<T> {
}
}
/**
* Gets an instance of SearchIndexingBufferedSender.
* @param options - SearchIndexingBufferedSender Options
*/
public getSearchIndexingBufferedSenderInstance(
options: SearchIndexingBufferedSenderOptions = {}
): SearchIndexingBufferedSender<T> {
return createSearchIndexingBufferedSender(this, options);
}
private encodeContinuationToken(
nextLink: string | undefined,
nextPageParameters: SearchRequest | undefined

Просмотреть файл

@ -1,32 +1,168 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { IndexDocumentsBatch } from "./indexDocumentsBatch";
import {
IndexDocumentsAction,
SearchIndexingBufferedSenderOptions,
SearchIndexingBufferedSenderUploadDocumentsOptions,
SearchIndexingBufferedSenderMergeDocumentsOptions,
SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions,
SearchIndexingBufferedSenderDeleteDocumentsOptions,
SearchIndexingBufferedSenderFlushDocumentsOptions
SearchIndexingBufferedSenderFlushDocumentsOptions,
IndexDocumentsOptions
} from "./indexModels";
import { IndexDocumentsResult } from "./generated/data/models";
import { RestError } from "@azure/core-http";
import { RestError, OperationOptions } from "@azure/core-http";
import EventEmitter from "events";
import { createSpan } from "./tracing";
import { CanonicalCode } from "@opentelemetry/api";
import { delay } from "@azure/core-http";
import { getRandomIntegerInclusive } from "./serviceUtils";
/**
* Index Documents Client
*/
export interface IndexDocumentsClient<T> {
/**
* Perform a set of index modifications (upload, merge, mergeOrUpload, delete)
* for the given set of documents.
*
* @param batch - An array of actions to perform on the index.
* @param options - Additional options.
*/
indexDocuments(
batch: IndexDocumentsBatch<T>,
options: IndexDocumentsOptions
): Promise<IndexDocumentsResult>;
}
/**
* Default Batch Size
*/
export const DEFAULT_BATCH_SIZE: number = 512;
/**
* Default window flush interval
*/
export const DEFAULT_FLUSH_WINDOW: number = 60000;
/**
* Default number of times to retry.
*/
export const DEFAULT_RETRY_COUNT: number = 3;
/**
* Default retry delay.
*/
export const DEFAULT_RETRY_DELAY: number = 800;
/**
* Default Max Delay between retries.
*/
export const DEFAULT_MAX_RETRY_DELAY: number = 60000;
/**
* Class used to perform buffered operations against a search index,
* including adding, updating, and removing them.
*/
export interface SearchIndexingBufferedSender<T> {
export class SearchIndexingBufferedSender<T> {
/**
* Search Client used to call the underlying IndexBatch operations.
*/
private client: IndexDocumentsClient<T>;
/**
* Indicates if autoFlush is enabled.
*/
private autoFlush: boolean;
/**
* Interval between flushes (in milliseconds).
*/
private flushWindowInMs: number;
/**
* Delay between retries
*/
private throttlingDelayInMs: number;
/**
* Maximum number of Retries
*/
private maxRetriesPerAction: number;
/**
* Max Delay between retries
*/
private maxThrottlingDelayInMs: number;
/**
* Size of the batch.
*/
private initialBatchActionCount: number;
/**
* Batch object used to complete the service call.
*/
private batchObject: IndexDocumentsBatch<T>;
/**
* Clean up for the timer
*/
private cleanupTimer?: () => void;
/**
* Event emitter/publisher used in the Buffered Sender
*/
private readonly emitter = new EventEmitter();
/**
* Creates a new instance of SearchIndexingBufferedSender.
*
* @param client - Search Client used to call the underlying IndexBatch operations.
* @param options - Options to modify auto flush.
*
*/
constructor(client: IndexDocumentsClient<T>, options: SearchIndexingBufferedSenderOptions = {}) {
this.client = client;
// General Configuration properties
this.autoFlush = options.autoFlush ?? false;
this.initialBatchActionCount = options.initialBatchActionCount ?? DEFAULT_BATCH_SIZE;
this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW;
// Retry specific configuration properties
this.throttlingDelayInMs = options.throttlingDelayInMs ?? DEFAULT_FLUSH_WINDOW;
this.maxRetriesPerAction = options.maxRetriesPerAction ?? DEFAULT_RETRY_COUNT;
this.maxThrottlingDelayInMs = options.maxThrottlingDelayInMs ?? DEFAULT_MAX_RETRY_DELAY;
this.batchObject = new IndexDocumentsBatch<T>();
if (this.autoFlush) {
const interval = setInterval(() => this.flush(), this.flushWindowInMs);
interval?.unref();
this.cleanupTimer = () => {
clearInterval(interval);
};
}
}
/**
* Uploads the documents/Adds the documents to the upload queue.
*
* @param documents - Documents to be uploaded.
* @param options - Upload options.
*/
uploadDocuments(
public async uploadDocuments(
documents: T[],
options?: SearchIndexingBufferedSenderUploadDocumentsOptions
): Promise<void>;
options: SearchIndexingBufferedSenderUploadDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-uploadDocuments",
options
);
try {
this.batchObject.upload(documents);
this.emitter.emit("batchAdded", {
action: "upload",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Merges the documents/Adds the documents to the merge queue.
@ -34,10 +170,31 @@ export interface SearchIndexingBufferedSender<T> {
* @param documents - Documents to be merged.
* @param options - Upload options.
*/
mergeDocuments(
public async mergeDocuments(
documents: T[],
options?: SearchIndexingBufferedSenderMergeDocumentsOptions
): Promise<void>;
options: SearchIndexingBufferedSenderMergeDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-mergeDocuments",
options
);
try {
this.batchObject.merge(documents);
this.emitter.emit("batchAdded", {
action: "merge",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Merges/Uploads the documents/Adds the documents to the merge/upload queue.
@ -45,10 +202,31 @@ export interface SearchIndexingBufferedSender<T> {
* @param documents - Documents to be merged/uploaded.
* @param options - Upload options.
*/
mergeOrUploadDocuments(
public async mergeOrUploadDocuments(
documents: T[],
options?: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions
): Promise<void>;
options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-mergeOrUploadDocuments",
options
);
try {
this.batchObject.mergeOrUpload(documents);
this.emitter.emit("batchAdded", {
action: "mergeOrUpload",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Deletes the documents/Adds the documents to the delete queue.
@ -56,22 +234,67 @@ export interface SearchIndexingBufferedSender<T> {
* @param documents - Documents to be deleted.
* @param options - Upload options.
*/
deleteDocuments(
public async deleteDocuments(
documents: T[],
options?: SearchIndexingBufferedSenderDeleteDocumentsOptions
): Promise<void>;
options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-deleteDocuments",
options
);
try {
this.batchObject.delete(documents);
this.emitter.emit("batchAdded", {
action: "delete",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Flushes the queue manually.
*
* @param options - Flush options.
*/
flush(options?: SearchIndexingBufferedSenderFlushDocumentsOptions): Promise<void>;
public async flush(
options: SearchIndexingBufferedSenderFlushDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-flush", options);
try {
if (this.batchObject.actions.length > 0) {
return this.internalFlush(true, updatedOptions);
}
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* If using autoFlush: true, call this to cleanup the autoflush timer.
*/
dispose(): Promise<void>;
public async dispose(): Promise<void> {
if (this.batchObject.actions.length > 0) {
await this.internalFlush(true);
}
if (this.cleanupTimer) {
this.cleanupTimer();
}
}
/**
* Attach Batch Added Event
@ -79,28 +302,34 @@ export interface SearchIndexingBufferedSender<T> {
* @param event - Event to be emitted
* @param listener - Event Listener
*/
on(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void;
public on(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void;
/**
* Attach Batch Sent Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
public on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
/**
* Attach Batch Succeeded Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
/**
* Attach Batch Failed Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
on(event: "batchFailed", listener: (e: RestError) => void): void;
public on(event: "batchFailed", listener: (e: RestError) => void): void;
public on(
event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed" | "batchResizing",
listener: (e: any) => void
): void {
this.emitter.on(event, listener);
}
/**
* Detach Batch Added Event
@ -108,26 +337,96 @@ export interface SearchIndexingBufferedSender<T> {
* @param event - Event to be emitted
* @param listener - Event Listener
*/
off(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void;
public off(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void;
/**
* Detach Batch Sent Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
public off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
/**
* Detach Batch Succeeded Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
/**
* Detach Batch Failed Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
off(event: "batchFailed", listener: (e: RestError) => void): void;
public off(event: "batchFailed", listener: (e: RestError) => void): void;
public off(
event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed",
listener: (e: any) => void
): void {
this.emitter.removeListener(event, listener);
}
private isBatchReady(): boolean {
return this.batchObject.actions.length >= this.initialBatchActionCount;
}
private async internalFlush(force: boolean, options: OperationOptions = {}): Promise<void> {
if (force || (this.autoFlush && this.isBatchReady())) {
// Split it
const actions: IndexDocumentsAction<T>[] = this.batchObject.actions;
this.batchObject = new IndexDocumentsBatch<T>();
while (actions.length > 0) {
const actionsToSend = actions.splice(0, this.initialBatchActionCount);
await this.submitDocuments(actionsToSend, options);
}
}
}
private async submitDocuments(
actionsToSend: IndexDocumentsAction<T>[],
options: OperationOptions,
retryAttempt: number = 1
): Promise<void> {
try {
for (const action of actionsToSend) {
this.emitter.emit("beforeDocumentSent", action);
}
const result = await this.client.indexDocuments(
new IndexDocumentsBatch<T>(actionsToSend),
options
);
// raise success event
this.emitter.emit("batchSucceeded", result);
} catch (e) {
if (e.code && e.code === "413" && actionsToSend.length > 1) {
// Cut the payload size to half
const splitActionsArray = [
actionsToSend.slice(0, actionsToSend.length / 2),
actionsToSend.slice(actionsToSend.length / 2, actionsToSend.length)
];
this.initialBatchActionCount = splitActionsArray[0].length; // So, we do not want 413 happening again and again
for (const actions of splitActionsArray) {
await this.submitDocuments(actions, options);
}
} else if (this.isRetryAbleError(e) && retryAttempt <= this.maxRetriesPerAction) {
// Exponentially increase the delay each time
const exponentialDelay = this.throttlingDelayInMs * Math.pow(2, retryAttempt);
// Don't let the delay exceed the maximum
const clampedExponentialDelay = Math.min(this.maxThrottlingDelayInMs, exponentialDelay);
// Allow the final value to have some "jitter" (within 50% of the delay size) so
// that retries across multiple clients don't occur simultaneously.
const delayWithJitter =
clampedExponentialDelay / 2 + getRandomIntegerInclusive(0, clampedExponentialDelay / 2);
await delay(delayWithJitter);
await this.submitDocuments(actionsToSend, options, retryAttempt + 1);
} else {
this.emitter.emit("batchFailed", e);
throw e;
}
}
}
private isRetryAbleError(e: any): boolean {
return e.code && (e.code === "422" || e.code === "409" || e.code === "503");
}
}

Просмотреть файл

@ -1,424 +0,0 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import { IndexDocumentsBatch } from "./indexDocumentsBatch";
import {
IndexDocumentsAction,
SearchIndexingBufferedSenderOptions,
SearchIndexingBufferedSenderUploadDocumentsOptions,
SearchIndexingBufferedSenderMergeDocumentsOptions,
SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions,
SearchIndexingBufferedSenderDeleteDocumentsOptions,
SearchIndexingBufferedSenderFlushDocumentsOptions,
IndexDocumentsOptions
} from "./indexModels";
import { IndexDocumentsResult } from "./generated/data/models";
import { RestError, OperationOptions } from "@azure/core-http";
import EventEmitter from "events";
import { createSpan } from "./tracing";
import { CanonicalCode } from "@opentelemetry/api";
import { SearchIndexingBufferedSender } from "./searchIndexingBufferedSender";
import { delay } from "@azure/core-http";
import { getRandomIntegerInclusive } from "./serviceUtils";
interface IndexDocumentsClient<T> {
indexDocuments(
batch: IndexDocumentsBatch<T>,
options: IndexDocumentsOptions
): Promise<IndexDocumentsResult>;
}
/**
* Default Batch Size
*/
export const DEFAULT_BATCH_SIZE: number = 512;
/**
* Default window flush interval
*/
export const DEFAULT_FLUSH_WINDOW: number = 60000;
/**
* Default number of times to retry.
*/
export const DEFAULT_RETRY_COUNT: number = 3;
/**
* Default retry delay.
*/
export const DEFAULT_RETRY_DELAY: number = 800;
/**
* Default Max Delay between retries.
*/
export const DEFAULT_MAX_RETRY_DELAY: number = 60000;
/**
* Class used to perform buffered operations against a search index,
* including adding, updating, and removing them.
*/
class SearchIndexingBufferedSenderImpl<T> implements SearchIndexingBufferedSender<T> {
/**
* Search Client used to call the underlying IndexBatch operations.
*/
private client: IndexDocumentsClient<T>;
/**
* Indicates if autoFlush is enabled.
*/
private autoFlush: boolean;
/**
* Interval between flushes (in milliseconds).
*/
private flushWindowInMs: number;
/**
* Delay between retries
*/
private retryDelayInMs: number;
/**
* Maximum number of Retries
*/
private maxRetries: number;
/**
* Max Delay between retries
*/
private maxRetryDelayInMs: number;
/**
* Size of the batch.
*/
private initialBatchActionCount: number;
/**
* Batch object used to complete the service call.
*/
private batchObject: IndexDocumentsBatch<T>;
/**
* Clean up for the timer
*/
private cleanupTimer?: () => void;
/**
* Event emitter/publisher used in the Buffered Sender
*/
private readonly emitter = new EventEmitter();
/**
* Creates a new instance of SearchIndexingBufferedSender.
*
* @param client - Search Client used to call the underlying IndexBatch operations.
* @param options - Options to modify auto flush.
*
*/
constructor(client: IndexDocumentsClient<T>, options: SearchIndexingBufferedSenderOptions = {}) {
this.client = client;
// General Configuration properties
this.autoFlush = options.autoFlush ?? false;
this.initialBatchActionCount = options.initialBatchActionCount ?? DEFAULT_BATCH_SIZE;
this.flushWindowInMs = options.flushWindowInMs ?? DEFAULT_FLUSH_WINDOW;
// Retry specific configuration properties
this.retryDelayInMs = options.retryDelayInMs ?? DEFAULT_FLUSH_WINDOW;
this.maxRetries = options.maxRetries ?? DEFAULT_RETRY_COUNT;
this.maxRetryDelayInMs = options.maxRetryDelayInMs ?? DEFAULT_MAX_RETRY_DELAY;
this.batchObject = new IndexDocumentsBatch<T>();
if (this.autoFlush) {
const interval = setInterval(() => this.flush(), this.flushWindowInMs);
interval?.unref();
this.cleanupTimer = () => {
clearInterval(interval);
};
}
}
/**
* Uploads the documents/Adds the documents to the upload queue.
*
* @param documents - Documents to be uploaded.
* @param options - Upload options.
*/
public async uploadDocuments(
documents: T[],
options: SearchIndexingBufferedSenderUploadDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-uploadDocuments",
options
);
try {
this.batchObject.upload(documents);
this.emitter.emit("batchAdded", {
action: "upload",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Merges the documents/Adds the documents to the merge queue.
*
* @param documents - Documents to be merged.
* @param options - Upload options.
*/
public async mergeDocuments(
documents: T[],
options: SearchIndexingBufferedSenderMergeDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-mergeDocuments",
options
);
try {
this.batchObject.merge(documents);
this.emitter.emit("batchAdded", {
action: "merge",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Merges/Uploads the documents/Adds the documents to the merge/upload queue.
*
* @param documents - Documents to be merged/uploaded.
* @param options - Upload options.
*/
public async mergeOrUploadDocuments(
documents: T[],
options: SearchIndexingBufferedSenderMergeOrUploadDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-mergeOrUploadDocuments",
options
);
try {
this.batchObject.mergeOrUpload(documents);
this.emitter.emit("batchAdded", {
action: "mergeOrUpload",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Deletes the documents/Adds the documents to the delete queue.
*
* @param documents - Documents to be deleted.
* @param options - Upload options.
*/
public async deleteDocuments(
documents: T[],
options: SearchIndexingBufferedSenderDeleteDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan(
"SearchIndexingBufferedSender-deleteDocuments",
options
);
try {
this.batchObject.delete(documents);
this.emitter.emit("batchAdded", {
action: "delete",
documents
});
return this.internalFlush(false, updatedOptions);
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* Flushes the queue manually.
*
* @param options - Flush options.
*/
public async flush(
options: SearchIndexingBufferedSenderFlushDocumentsOptions = {}
): Promise<void> {
const { span, updatedOptions } = createSpan("SearchIndexingBufferedSender-flush", options);
try {
if (this.batchObject.actions.length > 0) {
return this.internalFlush(true, updatedOptions);
}
} catch (e) {
span.setStatus({
code: CanonicalCode.UNKNOWN,
message: e.message
});
throw e;
} finally {
span.end();
}
}
/**
* If using autoFlush: true, call this to cleanup the autoflush timer.
*/
public async dispose(): Promise<void> {
if (this.batchObject.actions.length > 0) {
await this.internalFlush(true);
}
if (this.cleanupTimer) {
this.cleanupTimer();
}
}
/**
* Attach Batch Added Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public on(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void;
/**
* Attach Batch Sent Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public on(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
/**
* Attach Batch Succeeded Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public on(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
/**
* Attach Batch Failed Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public on(event: "batchFailed", listener: (e: RestError) => void): void;
public on(
event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed",
listener: (e: any) => void
): void {
this.emitter.on(event, listener);
}
/**
* Detach Batch Added Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public off(event: "batchAdded", listener: (e: { action: string; documents: T[] }) => void): void;
/**
* Detach Batch Sent Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public off(event: "beforeDocumentSent", listener: (e: IndexDocumentsAction<T>) => void): void;
/**
* Detach Batch Succeeded Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public off(event: "batchSucceeded", listener: (e: IndexDocumentsResult) => void): void;
/**
* Detach Batch Failed Event
*
* @param event - Event to be emitted
* @param listener - Event Listener
*/
public off(event: "batchFailed", listener: (e: RestError) => void): void;
public off(
event: "batchAdded" | "beforeDocumentSent" | "batchSucceeded" | "batchFailed",
listener: (e: any) => void
): void {
this.emitter.removeListener(event, listener);
}
private isBatchReady(): boolean {
return this.batchObject.actions.length >= this.initialBatchActionCount;
}
private async internalFlush(force: boolean, options: OperationOptions = {}): Promise<void> {
if (force || (this.autoFlush && this.isBatchReady())) {
// Split it
const actions: IndexDocumentsAction<T>[] = this.batchObject.actions;
this.batchObject = new IndexDocumentsBatch<T>();
while (actions.length > 0) {
const actionsToSend = actions.splice(0, this.initialBatchActionCount);
await this.submitDocuments(actionsToSend, options);
}
}
}
private async submitDocuments(
actionsToSend: IndexDocumentsAction<T>[],
options: OperationOptions,
retryAttempt: number = 1
): Promise<void> {
try {
for (const action of actionsToSend) {
this.emitter.emit("beforeDocumentSent", action);
}
const result = await this.client.indexDocuments(
new IndexDocumentsBatch<T>(actionsToSend),
options
);
// raise success event
this.emitter.emit("batchSucceeded", result);
} catch (e) {
if (this.isRetryAbleError(e) && retryAttempt <= this.maxRetries) {
// Exponentially increase the delay each time
const exponentialDelay = this.retryDelayInMs * Math.pow(2, retryAttempt);
// Don't let the delay exceed the maximum
const clampedExponentialDelay = Math.min(this.maxRetryDelayInMs, exponentialDelay);
// Allow the final value to have some "jitter" (within 50% of the delay size) so
// that retries across multiple clients don't occur simultaneously.
const delayWithJitter =
clampedExponentialDelay / 2 + getRandomIntegerInclusive(0, clampedExponentialDelay / 2);
await delay(delayWithJitter);
this.submitDocuments(actionsToSend, options, retryAttempt + 1);
} else {
this.emitter.emit("batchFailed", e);
throw e;
}
}
}
private isRetryAbleError(e: any): boolean {
return e.code && (e.code === "422" || e.code === "409" || e.code === "503");
}
}
/**
* Creates an object that satisfies the `SearchIndexingBufferedSender` interface.
* @param client - Search Client used to call the underlying IndexBatch operations.
* @param options - Options to modify auto flush.
*/
export function createSearchIndexingBufferedSender<T>(
indexDocumentsClient: IndexDocumentsClient<T>,
options: SearchIndexingBufferedSenderOptions = {}
): SearchIndexingBufferedSender<T> {
return new SearchIndexingBufferedSenderImpl(indexDocumentsClient, options);
}

Просмотреть файл

@ -97,7 +97,7 @@ function deserializeDates(input: unknown): Date | unknown {
function deserializeGeoPoint(input: unknown): GeographyPoint | unknown {
if (isGeoJSONPoint(input)) {
return new GeographyPoint(input.coordinates[0], input.coordinates[1]);
return new GeographyPoint({ longitude: input.coordinates[0], latitude: input.coordinates[1] });
}
return input;

Просмотреть файл

@ -7,7 +7,10 @@ import GeographyPoint from "../../src/geographyPoint";
describe("geographyPoint", () => {
it("JSON.stringify", () => {
const geoPoint = new GeographyPoint(47.669444, -122.123889);
const geoPoint = new GeographyPoint({
longitude: -122.123889,
latitude: 47.669444
});
const result = JSON.parse(JSON.stringify(geoPoint));
assert.deepEqual(result, {
type: "Point",

Просмотреть файл

@ -99,8 +99,8 @@ describe("serialization.deserialize", () => {
}
});
assert.instanceOf(result.location, GeographyPoint);
assert.equal(result.location.latitude, -84.527771);
assert.equal(result.location.longitude, 37.989769);
assert.equal(result.location.latitude, 37.989769);
assert.equal(result.location.longitude, -84.527771);
});
afterEach(() => {

Просмотреть файл

@ -250,7 +250,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: false,
lastRenovationDate: new Date(2010, 5, 27),
rating: 5,
location: new GeographyPoint(47.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 47.678581
})
},
{
hotelId: "2",
@ -263,7 +266,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: true,
lastRenovationDate: new Date(1982, 3, 28),
rating: 1,
location: new GeographyPoint(49.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 49.678581
})
},
{
hotelId: "3",
@ -276,7 +282,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: false,
lastRenovationDate: new Date(1995, 6, 1),
rating: 4,
location: new GeographyPoint(46.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 46.678581
})
},
{
hotelId: "4",
@ -289,7 +298,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: false,
lastRenovationDate: new Date(1995, 6, 1),
rating: 4,
location: new GeographyPoint(46.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 46.678581
})
},
{
hotelId: "5",
@ -302,7 +314,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: false,
lastRenovationDate: new Date(2012, 7, 12),
rating: 4,
location: new GeographyPoint(48.678581, -122.131577)
location: new GeographyPoint({
longitude: -122.131577,
latitude: 48.678581
})
},
{
hotelId: "6",
@ -335,7 +350,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: true,
lastRenovationDate: new Date(1970, 0, 18),
rating: 4,
location: new GeographyPoint(40.760586, -73.975403),
location: new GeographyPoint({
longitude: -73.975403,
latitude: 40.760586
}),
address: {
streetAddress: "677 5th Ave",
city: "New York",
@ -379,7 +397,10 @@ export async function populateIndex(client: SearchClient<Hotel>): Promise<void>
smokingAllowed: true,
lastRenovationDate: new Date(1999, 8, 6),
rating: 3,
location: new GeographyPoint(35.90416, -78.940483),
location: new GeographyPoint({
longitude: -78.940483,
latitude: 35.90416
}),
address: {
streetAddress: "6910 Fayetteville Rd",
city: "Durham",