added config tests | minor fixes

This commit is contained in:
fahad 2020-06-16 12:04:03 +05:30
Родитель e4b0bac612
Коммит 3c2b7e6c56
3 изменённых файлов: 39 добавлений и 7 удалений

Просмотреть файл

@ -32,6 +32,8 @@ public class KustoSinkConfig extends AbstractConfig {
}
}
static final String CONNECTOR_NAME_CONF = "name";
// TODO: this might need to be per kusto cluster...
static final String KUSTO_URL_CONF = "kusto.url";
private static final String KUSTO_URL_DOC = "Kusto ingestion service URI.";
@ -426,8 +428,8 @@ public class KustoSinkConfig extends AbstractConfig {
}
public String getDlqTopicName() {
String connectorName = this.getString("name");
String dlqTopicName = this.getString(KUSTO_DLQ_TOPIC_NAME_CONF);
String connectorName = getString(CONNECTOR_NAME_CONF);
String dlqTopicName = getString(KUSTO_DLQ_TOPIC_NAME_CONF);
dlqTopicName = (dlqTopicName.contains("${connector}"))
? dlqTopicName.replace("${connector}", connectorName)
: dlqTopicName;

Просмотреть файл

@ -3,6 +3,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.Results;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
@ -181,15 +182,15 @@ public class KustoSinkTask extends SinkTask {
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null) {
if (format.equals(IngestionProperties.DATA_FORMAT.json.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
} else if (format.equals(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.avro);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.parquet);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Parquet);
} else if (format.equals(IngestionProperties.DATA_FORMAT.orc.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.orc);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Orc);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv);
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);
}
}
}

Просмотреть файл

@ -4,11 +4,17 @@ import org.apache.kafka.common.config.ConfigException;
import org.junit.Before;
import org.junit.Test;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.ErrorTolerance;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class KustoSinkConnectorConfigTest {
Map<String, String> settings;
@ -37,6 +43,8 @@ public class KustoSinkConnectorConfigTest {
assertNull(config.getTopicToTableMapping());
assertNotNull(config.getFlushSizeBytes());
assertNotNull(config.getFlushInterval());
assertFalse(config.isDlqEnabled());
assertEquals(ErrorTolerance.NONE, config.getErrorTolerance());
}
@Test(expected = ConfigException.class)
@ -45,5 +53,26 @@ public class KustoSinkConnectorConfigTest {
settings.remove(KustoSinkConfig.KUSTO_URL_CONF);
config = new KustoSinkConfig(settings);
}
@Test(expected = ConfigException.class)
public void shouldFailWhenErrorToleranceIncorrectlyConfigured() {
// Adding required Configuration with no default value.
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
settings.put(KustoSinkConfig.KUSTO_ERROR_TOLERANCE_CONF, "DummyValue");
config = new KustoSinkConfig(settings);
}
@Test
public void verifyDlqSettings() {
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082");
//settings.put(KustoSinkConfig.CONNECTOR_NAME_CONF, "KustoConnectorTest");
config = new KustoSinkConfig(settings);
assertTrue(config.isDlqEnabled());
assertEquals(Arrays.asList("localhost:8081", "localhost:8082"), config.getDlqBootstrapServers());
//assertEquals("KustoConnectorTest-error", config.getDlqTopicName());
}
}