removed auth.user, auth.pass support | changed aad configs to mandatory
This commit is contained in:
Родитель
744f34c88d
Коммит
bb30c7cf2f
|
@ -60,13 +60,10 @@ value.converter=org.apache.kafka.connect.storage.StringConverter
|
|||
tasks.max=1
|
||||
topics=testing1,testing2
|
||||
|
||||
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'daniel', 'table': 'KafkaTest','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
|
||||
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
|
||||
|
||||
kusto.url=https://ingest-mycluster.kusto.windows.net/
|
||||
|
||||
kusto.auth.username
|
||||
kusto.auth.password
|
||||
|
||||
aad.auth.appid
|
||||
aad.auth.appkey
|
||||
aad.auth.authority
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
|
||||
|
||||
tasks.max=1
|
||||
#topics=topic1,topic2
|
||||
#topics=testing1,testing2
|
||||
|
||||
#kusto.url=https://ingest-{cluster}.kusto.windows.net/
|
||||
|
||||
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'daniel', 'table': 'KafkaTest','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
|
||||
|
||||
#kusto.auth.username=user1
|
||||
#kusto.auth.password=xxxxxxxx
|
||||
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
|
||||
|
||||
#aad.auth.appid=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
|
||||
#aad.auth.appkey=ZZZZZZZZZZZZZZZZZZZZZZZ
|
||||
|
|
|
@ -38,16 +38,8 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
// TODO: this might need to be per kusto cluster...
|
||||
static final String KUSTO_URL_CONF = "kusto.url";
|
||||
private static final String KUSTO_URL_DOC = "Kusto ingestion service URI.";
|
||||
private static final String KUSTO_URL_DISPLAY = "Kusto cluster URI";
|
||||
private static final String KUSTO_URL_DISPLAY = "Kusto cluster ingestion URI";
|
||||
|
||||
static final String KUSTO_AUTH_USERNAME_CONF = "kusto.auth.username";
|
||||
private static final String KUSTO_AUTH_USERNAME_DOC = "Kusto username for authentication, also configure kusto.auth.password.";
|
||||
private static final String KUSTO_AUTH_USERNAME_DISPLAY = "Kusto Auth Username";
|
||||
|
||||
static final String KUSTO_AUTH_PASSWORD_CONF = "kusto.auth.password";
|
||||
private static final String KUSTO_AUTH_PASSWORD_DOC = "Kusto password for the configured username.";
|
||||
private static final String KUSTO_AUTH_PASSWORD_DISPLAY = "Kusto Auth Password";
|
||||
|
||||
static final String KUSTO_AUTH_APPID_CONF = "aad.auth.appid";
|
||||
private static final String KUSTO_AUTH_APPID_DOC = "Application Id for Azure Active Directory authentication.";
|
||||
private static final String KUSTO_AUTH_APPID_DISPLAY = "Kusto Auth AppID";
|
||||
|
@ -306,30 +298,10 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
connectionGroupOrder++,
|
||||
Width.MEDIUM,
|
||||
KUSTO_URL_DISPLAY)
|
||||
.define(
|
||||
KUSTO_AUTH_USERNAME_CONF,
|
||||
Type.STRING,
|
||||
null,
|
||||
Importance.HIGH,
|
||||
KUSTO_AUTH_USERNAME_DOC,
|
||||
connectionGroupName,
|
||||
connectionGroupOrder++,
|
||||
Width.MEDIUM,
|
||||
KUSTO_AUTH_USERNAME_DISPLAY)
|
||||
.define(
|
||||
KUSTO_AUTH_PASSWORD_CONF,
|
||||
Type.PASSWORD,
|
||||
null,
|
||||
Importance.HIGH,
|
||||
KUSTO_AUTH_PASSWORD_DOC,
|
||||
connectionGroupName,
|
||||
connectionGroupOrder++,
|
||||
Width.MEDIUM,
|
||||
KUSTO_AUTH_PASSWORD_DISPLAY)
|
||||
.define(
|
||||
KUSTO_AUTH_APPKEY_CONF,
|
||||
Type.PASSWORD,
|
||||
null,
|
||||
ConfigDef.NO_DEFAULT_VALUE,
|
||||
Importance.HIGH,
|
||||
KUSTO_AUTH_APPKEY_DOC,
|
||||
connectionGroupName,
|
||||
|
@ -339,7 +311,7 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
.define(
|
||||
KUSTO_AUTH_APPID_CONF,
|
||||
Type.STRING,
|
||||
null,
|
||||
ConfigDef.NO_DEFAULT_VALUE,
|
||||
Importance.HIGH,
|
||||
KUSTO_AUTH_APPID_DOC,
|
||||
connectionGroupName,
|
||||
|
@ -349,7 +321,7 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
.define(
|
||||
KUSTO_AUTH_AUTHORITY_CONF,
|
||||
Type.STRING,
|
||||
null,
|
||||
ConfigDef.NO_DEFAULT_VALUE,
|
||||
Importance.HIGH,
|
||||
KUSTO_AUTH_AUTHORITY_DOC,
|
||||
connectionGroupName,
|
||||
|
@ -362,14 +334,6 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
return this.getString(KUSTO_URL_CONF);
|
||||
}
|
||||
|
||||
public String getAuthUsername() {
|
||||
return this.getString(KUSTO_AUTH_USERNAME_CONF);
|
||||
}
|
||||
|
||||
public String getAuthPassword() {
|
||||
return this.getPassword(KUSTO_AUTH_PASSWORD_CONF).value();
|
||||
}
|
||||
|
||||
public String getAuthAppid() {
|
||||
return this.getString(KUSTO_AUTH_APPID_CONF);
|
||||
}
|
||||
|
|
|
@ -76,19 +76,6 @@ public class KustoSinkTask extends SinkTask {
|
|||
return IngestClientFactory.createClient(kcsb);
|
||||
}
|
||||
|
||||
if (config.getAuthUsername() != null) {
|
||||
if (Strings.isNullOrEmpty(config.getAuthPassword())) {
|
||||
throw new ConfigException("Kusto authentication missing Password.");
|
||||
}
|
||||
|
||||
return IngestClientFactory.createClient(ConnectionStringBuilder.createWithAadUserCredentials(
|
||||
config.getKustoUrl(),
|
||||
config.getAuthUsername(),
|
||||
config.getAuthPassword()
|
||||
));
|
||||
}
|
||||
|
||||
|
||||
throw new ConfigException("Failed to initialize KustoIngestClient, please " +
|
||||
"provide valid credentials. Either Kusto username and password or " +
|
||||
"Kusto appId, appKey, and authority should be configured.");
|
||||
|
@ -115,16 +102,6 @@ public class KustoSinkTask extends SinkTask {
|
|||
return ClientFactory.createClient(kcsb);
|
||||
}
|
||||
|
||||
if (config.getAuthUsername() != null) {
|
||||
if (Strings.isNullOrEmpty(config.getAuthPassword())) {
|
||||
throw new ConfigException("Kusto authentication missing Password.");
|
||||
}
|
||||
return ClientFactory.createClient(ConnectionStringBuilder.createWithAadUserCredentials(
|
||||
engineClientURL,
|
||||
config.getAuthUsername(),
|
||||
config.getAuthPassword()
|
||||
));
|
||||
}
|
||||
throw new ConfigException("Failed to initialize KustoEngineClient, please " +
|
||||
"provide valid credentials. Either Kusto username and password or " +
|
||||
"Kusto appId, appKey, and authority should be configured.");
|
||||
|
@ -249,12 +226,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
String table = mapping.getString("table");
|
||||
try {
|
||||
|
||||
String authenticateWith;
|
||||
if (config.getAuthAppid() != null) {
|
||||
authenticateWith = "aadapp=" + config.getAuthAppid();
|
||||
} else {
|
||||
authenticateWith = "aaduser=" + config.getAuthUsername();
|
||||
}
|
||||
String authenticateWith = "aadapp=" + config.getAuthAppid();
|
||||
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_PRINCIPAL_ROLES_QUERY, authenticateWith, database, table));
|
||||
boolean hasAccess = (boolean) rs.getPrimaryResults().getData().get(0).get(INGESTION_ALLOWED_INDEX);
|
||||
if (hasAccess) {
|
||||
|
|
|
@ -31,6 +31,9 @@ public class KustoSinkConnectorConfigTest {
|
|||
// Adding required Configuration with no default value.
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
assertNotNull(config);
|
||||
}
|
||||
|
@ -40,6 +43,9 @@ public class KustoSinkConnectorConfigTest {
|
|||
// Adding required Configuration with no default value.
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
assertNotNull(config.getKustoUrl());
|
||||
assertNotNull(config.getFlushSizeBytes());
|
||||
|
@ -52,6 +58,18 @@ public class KustoSinkConnectorConfigTest {
|
|||
public void shouldThrowExceptionWhenKustoURLNotGiven() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
}
|
||||
|
||||
@Test(expected = ConfigException.class)
|
||||
public void shouldThrowExceptionWhenAppIdNotGiven() {
|
||||
// Adding required Configuration with no default value.
|
||||
settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
config = new KustoSinkConfig(settings);
|
||||
}
|
||||
|
||||
|
@ -60,6 +78,9 @@ public class KustoSinkConnectorConfigTest {
|
|||
// Adding required Configuration with no default value.
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
settings.put(KustoSinkConfig.KUSTO_BEHAVIOR_ON_ERROR_CONF, "DummyValue");
|
||||
config = new KustoSinkConfig(settings);
|
||||
}
|
||||
|
@ -68,6 +89,9 @@ public class KustoSinkConnectorConfigTest {
|
|||
public void verifyDlqSettings() {
|
||||
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
|
||||
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082");
|
||||
settings.put(KustoSinkConfig.KUSTO_DLQ_TOPIC_NAME_CONF, "dlq-error-topic");
|
||||
config = new KustoSinkConfig(settings);
|
||||
|
|
|
@ -46,8 +46,9 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://.kusto.windows.net");
|
||||
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME_CONF, "test@test.com");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
|
@ -69,8 +70,9 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
props.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'testing1','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME_CONF, "test@test.com");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
|
@ -98,8 +100,9 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
props.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, System.getProperty("java.io.tmpdir"));
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME_CONF, "test@test.com");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
|
@ -126,8 +129,9 @@ public class KustoSinkTaskTest {
|
|||
HashMap<String, String> props = new HashMap<>();
|
||||
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
|
||||
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv', 'eventDataCompression':'gz'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_USERNAME_CONF, "test@test.com");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
|
||||
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
|
|
Загрузка…
Ссылка в новой задаче