azure-sdk-for-js/sdk/eventhub/eventhubs-checkpointstore-t...
Harsha Nalluru 85437db51e
[Recorder] Unified recorder prototyping with storage-queue/data-tables SDKs (#15826)
* update rush.json

* lock file

* package.json, rollup and tsconfig

* src file

* test

* add recorder.stop call

* set http

* set protocol to http

* core-rest-pipeline draft

* storage test

* keep only the storage test

* lock file

* update package.json

* recorder-new

* lock file

* undefined to null

* remove target es5

* add a guide with starter steps

* testing-recorder-new

* lock file

* types - recorder-new

* import urlBuilder from core-http

* disclaimer in the guide

* comment out the test in recorder-new

* testing recorder-new setup and test

* rest-pipeline 1.1.0

* lock file

* "versionPolicyName": "test" and sdkType

* karma.conf

* fix package.json

* update test to take sas url

* omit readme checks for testing-recorder-new, recorder-new

* lock file from master

* lock file and readmes

* remove TEST_MODE variable

* update readme with temp-location

* resources update

* skip runnign in ci

* update package.json

* remove console.logs and fix queue name

* fix browser mappings

* index.browser.ts and console.logs

* remove .olg from clean command

* test file with logs

* update readme

* update readme to reflect additional environment variables that must be set

* login steps

* lock file

* test file

* Copying the recordings saved in the container

* remove console logs

* "@azure/data-tables": "^12.1.2"

* dependencies

* rename test file

* core-v2 recorder first draft

* core-v2 node test works

* karma-conf fix

* uri -> url

* update tests

* refactor core-v1 and core-v2 recorder clients

* refactor common code between core-v1 core-v2 and node and browser

* renames and underscore removals

* typings -> types

* address feedback

* RecordingStateManager

* lock file

* lock file

* lock file from master

* lock file

* recorder-new package test skipped

* delete commented test file

* Update sdk/test-utils/testing-recorder-new/README.md

* empty test file

* lock file

* add link descriptions

* Daniel's feedback

* Update sdk/test-utils/recorder-new/README.md

Co-authored-by: Scott Beddall <45376673+scbedd@users.noreply.github.com>

* Scott's feedback

* Addressing Will's feedback

* Add tslib

* RecordingState

* lock file

* currentState

* lock file

* docker cp

* remove lib from tsconfig

* more feedback

* utils file and base tests

* update error message

* initial set of tests

* Add copyright headers

* No need of the if checks

* both Test_Modes

* Append ${testMode} mode:

* karma conf and tests

* Daniel's new found love - npm run clean move from prebuild to build

* Added many many comments for Daniel 🐱👤

* lock file

* package renames

Co-authored-by: scbedd <45376673+scbedd@users.noreply.github.com>
2021-08-26 23:02:30 +00:00
..
.vscode Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
review Implementation and tests for TableCheckpointStore (#16685) 2021-08-11 09:23:11 -07:00
src Implementation and tests for TableCheckpointStore (#16685) 2021-08-11 09:23:11 -07:00
test Implementation and tests for TableCheckpointStore (#16685) 2021-08-11 09:23:11 -07:00
.nycrc Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
CHANGELOG.md Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
License Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
README.md Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
api-extractor.json Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
karma.conf.js Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
package.json [Recorder] Unified recorder prototyping with storage-queue/data-tables SDKs (#15826) 2021-08-26 23:02:30 +00:00
rollup.base.config.js Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
rollup.config.js Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
rollup.test.config.js Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
sample.env Implementation and tests for TableCheckpointStore (#16685) 2021-08-11 09:23:11 -07:00
tests.yml Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
tsconfig.json Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00
tsdoc.json Base branch for TableCheckpointstore (#16354) 2021-07-26 13:39:33 -07:00

README.md

Azure Event Hubs Checkpoint Store client library for JavaScript

An Azure Table storage based solution to store checkpoints and to aid in load balancing when using EventHubConsumerClient from the @azure/event-hubs library

Key Links:

Getting started

Currently supported environments

See our support policy for more details.

Prerequisites

Install the package

Install the Azure Event Hubs Checkpoint Store Table library using npm.

npm install @azure/eventhubs-checkpointstore-table

Configure Typescript

TypeScript users need to have Node type definitions installed:

npm install @types/node

You also need to enable compilerOptions.allowSyntheticDefaultImports in your tsconfig.json. Note that if you have enabled compilerOptions.esModuleInterop, allowSyntheticDefaultImports is enabled by default. See TypeScript's compiler options handbook for more information.

Key concepts

  • Scale: Create multiple consumers, with each consumer taking ownership of reading from a few Event Hubs partitions.

  • Load balance: Applications that support load balancing consist of one or more instances of EventHubConsumerClient which have been configured to consume events from the same Event Hub and consumer group and the same CheckpointStore. They balance the workload across different instances by distributing the partitions to be processed among themselves.

  • Checkpointing: It is a process by which readers mark or commit their position within a partition event sequence. Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group. This responsibility means that for each consumer group, each partition reader must keep track of its current position in the event stream, and can inform the service when it considers the data stream complete.

    If a reader disconnects from a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by downstream applications, and to provide resiliency if a failover between readers running on different machines occurs. It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this mechanism, checkpointing enables both failover resiliency and event stream replay.

    A TableCheckpointStore is a class that implements key methods required by the EventHubConsumerClient to balance load and update checkpoints.

Examples

Create a CheckpointStore using Azure Table Storage

Use the below code snippet to create a CheckpointStore. You will need to provide the connection string to your storage account.

import { TableClient } from "@azure/data-tables",
import { TableCheckpointStore } from "@azure/eventhubs-checkpointstore-table"

const tableClient = new TableClient("storage-connection-string", "table-name");

if (!tableClient.exists()) {
  await tableClient.create(); // This can be skipped if the table already exists
}

const checkpointStore =  new TableCheckpointStore(tableClient);

Checkpoint events using Azure Table storage

To checkpoint events received using Azure Table Storage, you will need to pass an object that is compatible with the SubscriptionEventHandlers interface along with code to call the updateCheckpoint() method.

In this example, SubscriptionHandlers implements SubscriptionEventHandlers and also handles checkpointing.

const { EventHubConsumerClient } = require("@azure/event-hubs");
const { TableClient } = require("@azure/data-tables");
const { TableCheckpointStore } = require("@azure/eventhubs-checkpointstore-table");

const storageAccountConnectionString = "storage-account-connection-string";
const tableName = "table-name";
const eventHubConnectionString = "eventhub-connection-string";
const consumerGroup = "my-consumer-group";
const eventHubName = "eventHubName";

async function main() {
  const tableClient = new TableClient(storageAccountConnectionString, tableName);

  if (!(await tableClient.exists())) {
    await tableClient.create();
  }

  const checkpointStore = new TableCheckpointStore(tableClient);
  const consumerClient = new EventHubConsumerClient(
    consumerGroup,
    eventHubConnectionString,
    eventHubName,
    checkpointStore
  );

  const subscription = consumerClient.subscribe({
    processEvents: async (events, context) => {
      // event processing code goes here
      if (events.length === 0) {
        // If the wait time expires (configured via options in maxWaitTimeInSeconds) Event Hubs
        // will pass you an empty array.
        return;
      }

      // Checkpointing will allow your service to pick up from
      // where it left off when restarting.
      //
      // You'll want to balance how often you checkpoint with the
      // performance of your underlying checkpoint store.
      await context.updateCheckpoint(events[events.length - 1]);
    },
    processError: async (err, context) => {
      // handle any errors that occur during the course of
      // this subscription
      console.log(`Errors in subscription to partition ${context.partitionId}: ${err}`);
    }
  });

  // Wait for a few seconds to receive events before closing
  await new Promise((resolve) => setTimeout(resolve, 10 * 1000));

  await subscription.close();
  await consumerClient.close();
  console.log(`Exiting sample`);
}

main();

Troubleshooting

Logging

You can set the AZURE_LOG_LEVEL environment variable to one of the following values to enable logging to stderr:

  • verbose
  • info
  • warning
  • error

You can also set the log level programatically by importing the @azure/logger package and calling the setLogLevel function with one of the log level values.

When setting a log level either programatically or via the AZURE_LOG_LEVEL environment variable, any logs that are written using a log level equal to or less than the one you choose will be emitted. For example, when you set the log level to info, the logs that are written for levels warning and error are also emitted. This SDK follows the Azure SDK for TypeScript guidelines when determining which level to log to.

You can alternatively set the DEBUG environment variable to get logs when using this library. This can be useful if you also want to emit logs from the dependencies rhea-promise and rhea as well.

Note: AZURE_LOG_LEVEL, if set, takes precedence over DEBUG. Do not specify any azure libraries via DEBUG when also specifying AZURE_LOG_LEVEL or calling setLogLevel.

You can set the following environment variable to get the debug logs when using this library.

  • Getting only info level debug logs from the Eventhubs Checkpointstore Table.
export DEBUG=azure:eventhubs-checkpointstore-table:info

Logging to a file

  • Enable logging as shown above and then run your test script as follows:

    • Logging statements from your test script go to out.log and logging statements from the sdk go to debug.log.

      node your-test-script.js > out.log 2>debug.log
      
    • Logging statements from your test script and the sdk go to the same file out.log by redirecting stderr to stdout (&1), and then redirect stdout to a file:

      node your-test-script.js >out.log 2>&1
      
    • Logging statements from your test script and the sdk go to the same file out.log.

      node your-test-script.js &> out.log
      

Next steps

Please take a look at the samples directory for detailed example.

Contributing

If you'd like to contribute to this library, please read the contributing guideto learn more about how to build and test the code.

Impressions