* Change lib to support workload identity (#127)

* * Change lib to add support for workload identity
* * Update README doc
This commit is contained in:
Ramachandran A G 2024-09-04 20:23:18 +05:30 коммит произвёл GitHub
Родитель 96d1485251
Коммит a664fcc3bc
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
8 изменённых файлов: 61 добавлений и 34 удалений

Просмотреть файл

@ -334,7 +334,7 @@ The following is complete set of connector sink properties-
| 2 | topics | Kafka topic specification | List of topics separated by commas<br>*Required* |
| 3 | kusto.ingestion.url | Kusto ingestion endpoint URL | Provide the ingest URL of your ADX cluster<br>Use the following construct for the private URL - https://ingest-private-[cluster].kusto.windows.net<br>*Required* |
| 4 | kusto.query.url | Kusto query endpoint URL | Provide the engine URL of your ADX cluster<br>*Optional* |
| 5 | aad.auth.strategy | Credentials for Kusto | Strategy to authenticate against Azure Active Directory, either ``application`` (default) or ``managed_identity``.<br>*Optional, `application` by default* |
| 5 | aad.auth.strategy | Credentials for Kusto | Strategy to authenticate against Azure Active Directory, either ``application`` (default) or ``managed_identity`` or ``workload_identity``.<br>*Optional, `application` by default* |
| 6 | aad.auth.authority | Credentials for Kusto | Provide the tenant ID of your Azure Active Directory<br>*Required when authentication is done with an `application` or when `kusto.validation.table.enable` is set to `true`* |
| 7 | aad.auth.appid | Credentials for Kusto | Provide Azure Active Directory Service Principal Name<br>*Required when authentication is done with an `application` or when `kusto.validation.table.enable` is set to `true`* |
| 8 | aad.auth.appkey | Credentials for Kusto | Provide Azure Active Directory Service Principal secret<br>*Required when authentication is done with an `application`* |
@ -535,7 +535,7 @@ connector-<br>
### 11.2. Provision Kafka Connect workers on an Azure Kubernetes Service cluster
5. Provision KafkaConnect workers on our Azure Kubernetes Service cluster
1. Provision KafkaConnect workers on our Azure Kubernetes Service cluster
When we start off, all we have is an empty Kubernetes cluster-
@ -557,13 +557,13 @@ Note: This still does not have copy tasks (connector tasks) running yet
### 11.3. Postman for Kafka Connect REST APIs or REST calls from your CLI
6. Install Postman on our local machine<br>
7. Import KafkaConnect REST call JSON collection from Github into Postman<br>
1. Install Postman on our local machine<br>
2. Import KafkaConnect REST call JSON collection from Github into Postman<br>
https://github.com/Azure/azure-kusto-labs/blob/confluent-clound-hol/kafka-integration/confluent-cloud/rest-calls/Confluent-Cloud-ADX-HoL-1-STUB.postman_collection.json<br>
OR<br>
8. Find the REST calls [here](https://docs.confluent.io/current/connect/references/restapi.html) to call from CLI
1. Find the REST calls [here](https://docs.confluent.io/current/connect/references/restapi.html) to call from CLI
Note: Depending on Kafka security
configuration, [update the security configuration in the sink properties](https://docs.confluent.io/current/connect/security.html#authentication)
@ -571,7 +571,7 @@ configuration, [update the security configuration in the sink properties](https:
### 11.4. Launch the connector tasks using the Kafka Connect REST API
9. Launch the Kafka-ADX copy tasks/REST call, otherwise called connector tasks from Postman or via curl command
1. Launch the Kafka-ADX copy tasks/REST call, otherwise called connector tasks from Postman or via curl command
This is what we will see, a Kusto sink connector cluster with copy tasks running.
@ -747,7 +747,8 @@ the [Release History](README.md#17-release-history) section of this document.
| 4.0.8 | 2024-04-22 | <ul><li>Fix vulnerability CVE-2024-29025 by upgrading io.netty</li></ul> |
| 4.0.9 | 2024-04-22 | <ul><li>Fix vulnerability CVE-2024-29025 by upgrading io.netty referenced indirectly</li></ul> |
| 4.0.10 | 2024-06-25 | <ul><li>Fix vulnerability CVE-2024-35255 by upgrading azure libs referenced indirectly & update Java SDK</li></ul> |
| 4.0.11 | 2024-08-07 | <ul><li>Upgrade SDK dependencies</li></ul> |
| 4.1.1 | 2024-08-07 | <ul><li>Upgrade SDK dependencies</li></ul> |
| 4.1.2 | 2024-08-07 | <ul><li>Upgrade SDK dependencies, Support for WIF based authentication</li></ul> |
## 17. Contributing

14
pom.xml
Просмотреть файл

@ -8,31 +8,31 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>4.1.1</version>
<version>4.1.2</version>
<properties>
<avro.random.generator.version>0.4.1</avro.random.generator.version>
<awaitility.version>4.2.0</awaitility.version>
<awaitility.version>4.2.2</awaitility.version>
<az.core.version>1.51.0</az.core.version>
<az.identity.version>1.13.2</az.identity.version>
<az.netty.version>1.15.3</az.netty.version>
<commoncodec.version>1.17.1</commoncodec.version>
<commonio.version>2.16.1</commonio.version>
<dependency-check-maven.version>10.0.2</dependency-check-maven.version>
<formatter.version>2.20.0</formatter.version>
<formatter.version>2.16.0</formatter.version>
<impsort.version>1.6.2</impsort.version>
<java.release.version>1.8</java.release.version>
<jsonassert.version>1.5.1</jsonassert.version>
<junit.version>5.10.0</junit.version>
<jsonassert.version>1.5.3</jsonassert.version>
<junit.version>5.11.0</junit.version>
<kafka.connect.avro.converter.version>7.7.0</kafka.connect.avro.converter.version>
<kafka.version>3.8.0</kafka.version>
<kusto.sdk.version>5.1.1</kusto.sdk.version>
<kusto.sdk.version>5.2.0</kusto.sdk.version>
<mockito.version>4.11.0</mockito.version>
<mvn.assembly.plugin.version>3.2.0</mvn.assembly.plugin.version>
<mvn.compiler.plugin.version>3.11.0</mvn.compiler.plugin.version>
<mvn.failsafe.plugin.version>3.0.0</mvn.failsafe.plugin.version>
<mvn.surefire.plugin.version>3.0.0</mvn.surefire.plugin.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<testcontainer.version>1.18.3</testcontainer.version>
<testcontainer.version>1.20.1</testcontainer.version>
</properties>
<build>
<plugins>

Просмотреть файл

@ -28,6 +28,7 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_INGEST_URL_CONF = "kusto.ingestion.url";
static final String KUSTO_ENGINE_URL_CONF = "kusto.query.url";
static final String KUSTO_AUTH_APPID_CONF = "aad.auth.appid";
static final String KUSTO_AUTH_ACCESS_TOKEN_CONF = "aad.auth.accesstoken";
static final String KUSTO_AUTH_APPKEY_CONF = "aad.auth.appkey";
static final String KUSTO_AUTH_AUTHORITY_CONF = "aad.auth.authority";
static final String KUSTO_AUTH_STRATEGY_CONF = "aad.auth.strategy";
@ -44,6 +45,7 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF = "errors.retry.backoff.time.ms";
static final String KUSTO_SINK_ENABLE_TABLE_VALIDATION = "kusto.validation.table.enable";
private static final String DLQ_PROPS_PREFIX = "misc.deadletterqueue.";
private static final String KUSTO_INGEST_URL_DOC = "Kusto ingestion endpoint URL.";
private static final String KUSTO_INGEST_URL_DISPLAY = "Kusto cluster ingestion URL";
private static final String KUSTO_ENGINE_URL_DOC = "Kusto query endpoint URL.";
@ -57,7 +59,6 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_CONNECTION_PROXY_PORT_DISPLAY = "Proxy port used to connect to Kusto";
private static final String KUSTO_AUTH_APPKEY_DISPLAY = "Kusto Auth AppKey";
static final String KUSTO_AUTH_ACCESS_TOKEN_CONF = "aad.auth.accesstoken";
private static final String KUSTO_AUTH_ACCESS_TOKEN_DISPLAY = "Kusto Auth AccessToken";
private static final String KUSTO_AUTH_ACCESS_TOKEN_DOC = "Kusto Access Token for Azure Active Directory authentication";
private static final String KUSTO_AUTH_AUTHORITY_DOC = "Azure Active Directory tenant.";
@ -326,11 +327,13 @@ public class KustoSinkConfig extends AbstractConfig {
KustoAuthenticationStrategy.APPLICATION.name(),
ConfigDef.ValidString.in(
KustoAuthenticationStrategy.APPLICATION.name(),
KustoAuthenticationStrategy.MANAGED_IDENTITY.name(),
KustoAuthenticationStrategy.APPLICATION.name().toLowerCase(Locale.ENGLISH),
KustoAuthenticationStrategy.MANAGED_IDENTITY.name(),
KustoAuthenticationStrategy.MANAGED_IDENTITY.name().toLowerCase(Locale.ENGLISH),
KustoAuthenticationStrategy.AZ_DEV_TOKEN.name(),
KustoAuthenticationStrategy.AZ_DEV_TOKEN.name().toLowerCase(Locale.ENGLISH)),
KustoAuthenticationStrategy.AZ_DEV_TOKEN.name().toLowerCase(Locale.ENGLISH),
KustoAuthenticationStrategy.WORKLOAD_IDENTITY.name(),
KustoAuthenticationStrategy.WORKLOAD_IDENTITY.name().toLowerCase(Locale.ENGLISH)),
Importance.HIGH,
KUSTO_AUTH_STRATEGY_DOC,
connectionGroupName,
@ -371,9 +374,11 @@ public class KustoSinkConfig extends AbstractConfig {
return this.getString(KUSTO_AUTH_APPID_CONF);
}
public String getAuthAppKey() {
return this.getPassword(KUSTO_AUTH_APPKEY_CONF).value();
}
public String getAuthAccessToken() {
return this.getPassword(KUSTO_AUTH_ACCESS_TOKEN_CONF).value();
}
@ -467,7 +472,7 @@ public class KustoSinkConfig extends AbstractConfig {
return this.getBoolean(KUSTO_SINK_ENABLE_TABLE_VALIDATION);
}
enum BehaviorOnError {
public enum BehaviorOnError {
FAIL, LOG, IGNORE;
/**
@ -483,7 +488,7 @@ public class KustoSinkConfig extends AbstractConfig {
}
}
enum KustoAuthenticationStrategy {
APPLICATION, MANAGED_IDENTITY, AZ_DEV_TOKEN
public enum KustoAuthenticationStrategy {
APPLICATION, MANAGED_IDENTITY, AZ_DEV_TOKEN, WORKLOAD_IDENTITY
}
}

Просмотреть файл

@ -21,6 +21,10 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.WorkloadIdentityCredential;
import com.azure.identity.WorkloadIdentityCredentialBuilder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
@ -71,7 +75,7 @@ public class KustoSinkTask extends SinkTask {
return Arrays.stream(config.getTopicToTableMapping()).anyMatch(TopicToTableMapping::isStreaming);
}
public static @NotNull ConnectionStringBuilder createKustoEngineConnectionString(KustoSinkConfig config, String clusterUrl) {
public static @NotNull ConnectionStringBuilder createKustoEngineConnectionString(@NotNull final KustoSinkConfig config, final String clusterUrl) {
final ConnectionStringBuilder kcsb;
switch (config.getAuthStrategy()) {
@ -86,12 +90,29 @@ public class KustoSinkTask extends SinkTask {
throw new ConfigException("Kusto authentication missing App Key.");
}
break;
case MANAGED_IDENTITY:
kcsb = ConnectionStringBuilder.createWithAadManagedIdentity(
clusterUrl,
config.getAuthAppId());
break;
case WORKLOAD_IDENTITY:
kcsb = ConnectionStringBuilder.createWithAadTokenProviderAuthentication(
clusterUrl,
() -> {
WorkloadIdentityCredential wic = new WorkloadIdentityCredentialBuilder().build();
TokenRequestContext requestContext = new TokenRequestContext();
String clusterScope = String.format("%s/.default", clusterUrl);
requestContext.setScopes(Collections.singletonList(clusterScope));
AccessToken accessToken = wic.getTokenSync(requestContext);
if (accessToken != null) {
log.debug("Returned access token that expires at {}", accessToken.getExpiresAt());
return accessToken.getToken();
} else {
log.error("Obtained empty token during token refresh. Context {}", clusterScope);
throw new ConnectException("Failed to retrieve WIF token");
}
});
break;
case AZ_DEV_TOKEN:
log.warn("Using DEV-TEST mode, use this for development only. NOT recommended for production scenarios");
kcsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(

Просмотреть файл

@ -15,11 +15,11 @@ class ITCoordinates {
String table;
ITCoordinates(String appId, String appKey, String authority, String accessToken, String cluster,
String ingestCluster, String database, String table) {
String ingestCluster, String database, String table) {
this.appId = appId;
this.appKey = appKey;
this.accessToken = accessToken;
this.authority = StringUtils.defaultIfBlank(authority,"microsoft.com");
this.authority = StringUtils.defaultIfBlank(authority, "microsoft.com");
this.ingestCluster = ingestCluster;
this.cluster = cluster;
this.database = database;

Просмотреть файл

@ -5,13 +5,14 @@ import java.util.UUID;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenRequestContext;
import com.azure.identity.AzureCliCredentialBuilder;
public class ITSetup {
static ITCoordinates getConnectorProperties() {
static @NotNull ITCoordinates getConnectorProperties() {
String testPrefix = "tmpKafkaSinkIT_";
String appId = getProperty("appId", "", false);
String appKey = getProperty("appKey", "", false);
@ -21,7 +22,7 @@ public class ITSetup {
String database = getProperty("database", "e2e", true);
String defaultTable = testPrefix + UUID.randomUUID().toString().replace('-', '_');
String table = getProperty("table", defaultTable, true);
return new ITCoordinates(appId, appKey, authority,getAccessToken(cluster), cluster, ingestCluster, database, table);
return new ITCoordinates(appId, appKey, authority, getAccessToken(cluster), cluster, ingestCluster, database, table);
}
private static String getAccessToken(String cluster) {

Просмотреть файл

@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.jetbrains.annotations.NotNull;
import org.json.JSONException;
import org.junit.jupiter.api.*;
import org.skyscreamer.jsonassert.Customization;
@ -90,13 +91,11 @@ class KustoSinkIT {
@BeforeAll
public static void startContainers() throws Exception {
coordinates = getConnectorProperties();
if (coordinates.isValidConfig()) {
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(coordinates.cluster,
coordinates.accessToken);
ConnectionStringBuilder dmCsb = ConnectionStringBuilder.
createWithAadAccessTokenAuthentication(coordinates.ingestCluster,coordinates.accessToken);
ConnectionStringBuilder dmCsb = ConnectionStringBuilder.createWithAadAccessTokenAuthentication(coordinates.ingestCluster, coordinates.accessToken);
engineClient = ClientFactory.createClient(engineCsb);
dmClient = ClientFactory.createClient(dmCsb);
log.info("Creating tables in Kusto");
@ -156,7 +155,7 @@ class KustoSinkIT {
schemaRegistryContainer.stop();
kafkaContainer.stop();
engineClient.execute(coordinates.database, String.format(".drop table %s", coordinates.table));
log.info("Finished table clean up. Dropped table {}", coordinates.table);
log.warn("Finished table clean up. Dropped table {}", coordinates.table);
dmClient.close();
engineClient.close();
}
@ -215,7 +214,7 @@ class KustoSinkIT {
});
}
private void produceKafkaMessages(String dataFormat) throws IOException {
private void produceKafkaMessages(@NotNull String dataFormat) throws IOException {
log.debug("Producing messages");
int maxRecords = 10;
Map<String, Object> producerProperties = new HashMap<>();
@ -291,7 +290,7 @@ class KustoSinkIT {
log.info("Produced messages for format {}", dataFormat);
Map<Long, String> actualRecordsIngested = getRecordsIngested(dataFormat, maxRecords);
actualRecordsIngested.keySet().parallelStream().forEach(key -> {
log.debug("Record queried: {}", actualRecordsIngested.get(key));
log.debug("Record queried in assertion : {}", actualRecordsIngested.get(key));
try {
JSONAssert.assertEquals(expectedRecordsProduced.get(key), actualRecordsIngested.get(key),
new CustomComparator(LENIENT,
@ -307,7 +306,7 @@ class KustoSinkIT {
assertEquals(maxRecords, actualRecordsIngested.size());
}
private Map<Long, String> getRecordsIngested(String dataFormat, int maxRecords) {
private @NotNull Map<Long, String> getRecordsIngested(String dataFormat, int maxRecords) {
String query = String.format("%s | where type == '%s' | project %s,vresult = pack_all()", coordinates.table, dataFormat, keyColumn);
Predicate<Object> predicate = (results) -> {
if (results != null) {

Просмотреть файл

@ -1,6 +1,6 @@
.create-merge table TBL (vnum:int, vdec:decimal, vdate:datetime, vb:boolean, vreal:real, vstr:string, vlong:long,type:string)
.alter table TBL policy streamingingestion enable
.alter table TBL policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:05", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
.alter table TBL policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:10", "MaximumNumberOfItems": 10, "MaximumRawDataSizeMB": 100}'
.create-or-alter table TBL ingestion avro mapping "avro_mapping" '[{"Column":"vnum","Properties":{"Field":"vnum"}},{"Column":"vreal","Properties":{"Field":"vreal"}},{"Column":"vdec","Properties":{"Field":"vdec"}},{"Column":"vdate","Properties":{"Field":"vdate"}},{"Column":"vstr","Properties":{"Field":"vstr"}},{"Column":"vb","Properties":{"Field":"vb"}},{"Column":"vlong","datatype":"long","Properties":{"Field":"vlong"}},{"Column":"type","Properties":{"ConstValue":"avro"}}]'
.create-or-alter table TBL ingestion json mapping "json_mapping" '[{"column":"vnum","datatype":"int","Properties":{"Path":"$.vnum"}},{"column":"vdate","datatype":"datetime","Properties":{"Path":"$.vdate"}},{"Column":"vdec","datatype":"decimal","Properties":{"Path":"$.vdec"}},{"column":"vb","datatype":"boolean","Properties":{"Path":"$.vb"}},{"column":"vreal","datatype":"real","Properties":{"Path":"$.vreal"}},{"column":"vstr","datatype":"string","Properties":{"Path":"$.vstr"}},{"column":"vlong","datatype":"long","Properties":{"Path":"$.vlong"}},{"column":"type","datatype":"string","Properties":{"ConstValue":"json"}}]'
.create-or-alter table TBL ingestion csv mapping "csv_mapping" '[{"column":"vnum","datatype":"int","Properties":{"Ordinal":"4"}},{"column":"vdate","datatype":"datetime","Properties":{"Ordinal":"1"}},{"Column":"vdec","datatype":"decimal","Properties":{"Ordinal":"2"}},{"column":"vb","datatype":"boolean","Properties":{"Ordinal":"0"}},{"column":"vreal","datatype":"real","Properties":{"Ordinal":"5"}},{"column":"vstr","datatype":"string","Properties":{"Ordinal":"6"}},{"column":"vlong","datatype":"long","Properties":{"Ordinal":"3"}},{"column":"type","datatype":"string","Properties":{"ConstValue":"csv"}}]'