зеркало из
1
0
Форкнуть 0
This commit is contained in:
fahad 2020-06-21 21:50:05 +05:30
Родитель e1d51cb195
Коммит f73579d75c
5 изменённых файлов: 32 добавлений и 22 удалений

Просмотреть файл

@ -6,6 +6,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.util.Strings;
@ -129,6 +130,11 @@ public class KustoSinkConfig extends AbstractConfig {
public KustoSinkConfig(Map<String, String> parsedConfig) {
this(getConfig(), parsedConfig);
if (Strings.isNullOrEmpty(getTopicToTableMapping())) {
throw new ConfigException("Missing 'kusto.tables.topics.mapping' configuration, "
+ "please configure it to appropriate value.");
}
}
public static ConfigDef getConfig() {
@ -147,7 +153,7 @@ public class KustoSinkConfig extends AbstractConfig {
defineConnectionConfigs(connectionGroupName, connectionGroupOrder, result);
defineWriteConfigs(writeGroupName, writeGroupOrder, result);
defineErrorHandlingAndRetriesConfgis(errorAndRetriesGroupName, errorAndRetriesGroupOrder, result);
return result;
}
@ -215,7 +221,7 @@ public class KustoSinkConfig extends AbstractConfig {
.define(
KUSTO_TABLES_MAPPING_CONF,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
"",
Importance.HIGH,
KUSTO_TABLES_MAPPING_DOC,
writeGroupName,
@ -225,7 +231,7 @@ public class KustoSinkConfig extends AbstractConfig {
.define(
KUSTO_TABLES_MAPPING_CONF_DEPRECATED,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
"",
Importance.HIGH,
KUSTO_TABLES_MAPPING_DOC + DEPRECATED_CONFIG_DOC,
writeGroupName,

Просмотреть файл

@ -77,7 +77,7 @@ public class E2ETest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, config);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -131,7 +131,7 @@ public class E2ETest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(100));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(300000));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, config);
writer2.open();
List<SinkRecord> records2 = new ArrayList<SinkRecord>();

Просмотреть файл

@ -1,6 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.connect.data.Schema;
@ -58,7 +60,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL);
fileWriter.openFile(null);
@ -88,7 +90,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
@ -128,7 +130,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock());
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL);
String msg = "Message";
SinkRecord record = new SinkRecord("TestTopic", 1, null, null, null, msg, 1);
@ -147,7 +149,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock());
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL);
String msg2 = "Second Message";
SinkRecord record2 = new SinkRecord("TestTopic", 1, null, null, null, msg2, 1);
@ -197,7 +199,7 @@ public class FileWriterTest {
}
return Paths.get(path, Long.toString(offset)).toString();
};
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock);
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock, BehaviorOnError.FAIL);
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
@ -261,7 +263,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock());
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), BehaviorOnError.FAIL);
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();

Просмотреть файл

@ -30,6 +30,7 @@ public class KustoSinkConnectorConfigTest {
public void shouldAcceptValidConfig() {
// Adding required Configuration with no default value.
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
config = new KustoSinkConfig(settings);
assertNotNull(config);
}
@ -38,9 +39,9 @@ public class KustoSinkConnectorConfigTest {
public void shouldHaveDefaultValues() {
// Adding required Configuration with no default value.
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
config = new KustoSinkConfig(settings);
assertNotNull(config.getKustoUrl());
assertNull(config.getTopicToTableMapping());
assertNotNull(config.getFlushSizeBytes());
assertNotNull(config.getFlushInterval());
assertFalse(config.isDlqEnabled());
@ -58,7 +59,7 @@ public class KustoSinkConnectorConfigTest {
public void shouldFailWhenErrorToleranceIncorrectlyConfigured() {
// Adding required Configuration with no default value.
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_BEHAVIOR_ON_ERROR_CONF, "DummyValue");
config = new KustoSinkConfig(settings);
}
@ -66,13 +67,14 @@ public class KustoSinkConnectorConfigTest {
@Test
public void verifyDlqSettings() {
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "kusto-url");
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF, "localhost:8081,localhost:8082");
//settings.put(KustoSinkConfig.CONNECTOR_NAME_CONF, "KustoConnectorTest");
settings.put(KustoSinkConfig.KUSTO_DLQ_TOPIC_NAME_CONF, "dlq-error-topic");
config = new KustoSinkConfig(settings);
assertTrue(config.isDlqEnabled());
assertEquals(Arrays.asList("localhost:8081", "localhost:8082"), config.getDlqBootstrapServers());
//assertEquals("KustoConnectorTest-error", config.getDlqTopicName());
assertEquals("dlq-error-topic", config.getDlqTopicName());
}
}

Просмотреть файл

@ -69,7 +69,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, config);
SourceFile descriptor = new SourceFile();
descriptor.rawBytes = 1024;
@ -110,7 +110,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString());
}
@ -133,7 +133,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
writer.open();
List<SinkRecord> records = new ArrayList<>();
@ -165,7 +165,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
writer.open();
writer.close();
}
@ -215,7 +215,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
writer.open();
@ -253,7 +253,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
@ -307,7 +307,7 @@ public class TopicPartitionWriterTest {
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, COMMIT_IMMEDIATELY, config);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();