Merge pull request #4 from SanchayGupta1197/fix_kusto_sink_test
fixed KustoSinkTaskTest
This commit is contained in:
Коммит
6dca39dbaa
|
@ -193,7 +193,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
return topicsToIngestionProps.get(topic);
|
||||
}
|
||||
|
||||
private void validateTableMappings(KustoSinkConfig config) {
|
||||
void validateTableMappings(KustoSinkConfig config) {
|
||||
List<String> databaseTableErrorList = new ArrayList<>();
|
||||
List<String> accessErrorList = new ArrayList<>();
|
||||
try {
|
||||
|
@ -320,7 +320,10 @@ public class KustoSinkTask extends SinkTask {
|
|||
flushInterval = config.getFlushInterval();
|
||||
log.info(String.format("Started KustoSinkTask with target cluster: (%s), source topics: (%s)",
|
||||
url, topicsToIngestionProps.keySet().toString()));
|
||||
open(context.assignment());
|
||||
// Adding this check to make code testable
|
||||
if(context!=null) {
|
||||
open(context.assignment());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,6 +8,7 @@ import org.junit.After;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.File;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
@ -18,6 +19,8 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
|
||||
public class KustoSinkTaskTest {
|
||||
|
@ -47,15 +50,17 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
kustoSinkTask.start(props);
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
ArrayList<TopicPartition> tps = new ArrayList<>();
|
||||
tps.add(new TopicPartition("topic1", 1));
|
||||
tps.add(new TopicPartition("topic1", 2));
|
||||
tps.add(new TopicPartition("topic2", 1));
|
||||
|
||||
kustoSinkTask.open(tps);
|
||||
kustoSinkTaskSpy.open(tps);
|
||||
|
||||
assertEquals(kustoSinkTask.writers.size(), 3);
|
||||
assertEquals(kustoSinkTaskSpy.writers.size(), 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -68,21 +73,23 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
kustoSinkTask.start(props);
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
|
||||
ArrayList<TopicPartition> tps = new ArrayList<>();
|
||||
TopicPartition tp = new TopicPartition("topic1", 1);
|
||||
tps.add(tp);
|
||||
|
||||
kustoSinkTask.open(tps);
|
||||
kustoSinkTaskSpy.open(tps);
|
||||
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
|
||||
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
|
||||
|
||||
kustoSinkTask.put(records);
|
||||
kustoSinkTaskSpy.put(records);
|
||||
|
||||
assertEquals(kustoSinkTask.writers.get(tp).currentOffset, 10);
|
||||
assertEquals(kustoSinkTaskSpy.writers.get(tp).currentOffset, 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -94,18 +101,20 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
kustoSinkTask.start(props);
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
|
||||
ArrayList<TopicPartition> tps = new ArrayList<>();
|
||||
tps.add(new TopicPartition("topic1", 1));
|
||||
|
||||
kustoSinkTask.open(tps);
|
||||
kustoSinkTaskSpy.open(tps);
|
||||
|
||||
List<SinkRecord> records = new ArrayList<SinkRecord>();
|
||||
|
||||
records.add(new SinkRecord("topic2", 1, null, null, null, "stringy message".getBytes(StandardCharsets.UTF_8), 10));
|
||||
|
||||
Throwable exception = assertThrows(ConnectException.class, () -> kustoSinkTask.put(records));
|
||||
Throwable exception = assertThrows(ConnectException.class, () -> kustoSinkTaskSpy.put(records));
|
||||
|
||||
assertEquals(exception.getMessage(), "Received a record without a mapped writer for topic:partition(topic2:1), dropping record.");
|
||||
|
||||
|
@ -120,18 +129,20 @@ public class KustoSinkTaskTest {
|
|||
props.put(KustoSinkConfig.KUSTO_AUTH_PASSWORD_CONF, "123456!");
|
||||
|
||||
KustoSinkTask kustoSinkTask = new KustoSinkTask();
|
||||
kustoSinkTask.start(props);
|
||||
KustoSinkTask kustoSinkTaskSpy = spy(kustoSinkTask);
|
||||
doNothing().when(kustoSinkTaskSpy).validateTableMappings(Mockito.<KustoSinkConfig>any());
|
||||
kustoSinkTaskSpy.start(props);
|
||||
{
|
||||
// single table mapping should cause all topics to be mapped to a single table
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").ingestionProperties.getDatabaseName(), "db1");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").ingestionProperties.getTableName(), "table1");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").ingestionProperties.getDataFormat(), "csv");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getDatabaseName(), "db2");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
|
||||
Assert.assertEquals(kustoSinkTask.getIngestionProps("topic1").eventDataCompression, CompressionType.gz);
|
||||
Assert.assertNull(kustoSinkTask.getIngestionProps("topic3"));
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDatabaseName(), "db1");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getTableName(), "table1");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDataFormat(), "csv");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDatabaseName(), "db2");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
|
||||
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").eventDataCompression, CompressionType.gz);
|
||||
Assert.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче