зеркало из
1
0
Форкнуть 0
This commit is contained in:
Yihezkel Schoenbrun 2020-10-13 01:04:10 +03:00
Родитель 770661695f
Коммит 5a2c98e3a9
4 изменённых файлов: 49 добавлений и 62 удалений

Просмотреть файл

@ -1,8 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
@ -25,13 +25,7 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Properties;
import java.util.*;
import java.util.logging.Logger;
public class E2ETest {
@ -49,14 +43,14 @@ public class E2ETest {
private Producer<byte[], byte[]> kafkaProducer;
@Before
public void setUp(){
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9000");
properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProducer = new KafkaProducer<>(properties);
isDlqEnabled = false;
dlqTopicName = null;
public void setUp() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9000");
properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProducer = new KafkaProducer<>(properties);
isDlqEnabled = false;
dlqTopicName = null;
}
@Test
@ -90,9 +84,10 @@ public class E2ETest {
props.ingestionProperties = ingestionProperties;
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
props.ingestionProperties.setIngestionMapping("mappy", IngestionMapping.IngestionMappingKind.Csv);
String KustoUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster);
String basepath = Paths.get(basePath, "csv").toString();
Map<String, String> settings = getKustoConfigs(KustoUrl, basepath, "mappy", fileThreshold, flushInterval);
Map<String, String> settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, "mappy", fileThreshold, flushInterval);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
@ -142,12 +137,13 @@ public class E2ETest {
props2.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
props2.ingestionProperties.setIngestionMapping("avroMapping", IngestionMapping.IngestionMappingKind.Avro);
TopicPartition tp2 = new TopicPartition("testPartition2", 11);
String KustoUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster);
String basepath = Paths.get(basePath, "avro").toString();
long fileThreshold = 100;
long flushInterval = 300000;
Map<String, String> settings = getKustoConfigs(KustoUrl, basepath, "avri", fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
Map<String, String> settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, "avri", fileThreshold, flushInterval);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer2.open();
List<SinkRecord> records2 = new ArrayList<>();
@ -193,10 +189,11 @@ public class E2ETest {
this.log.info("Successfully ingested " + expectedNumberOfRows + " records.");
}
private Map<String, String> getKustoConfigs(String clusterUrl, String basePath,String tableMapping, long fileThreshold,
long flushInterval) {
private Map<String, String> getKustoConfigs(String clusterUrl, String engineUrl, String basePath, String tableMapping,
long fileThreshold, long flushInterval) {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, clusterUrl);
settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, engineUrl);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, tableMapping);
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, appId);
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, appKey);

Просмотреть файл

@ -58,7 +58,8 @@ public class FileWriterTest {
final String FILE_PATH = Paths.get(path, "ABC").toString();
final int MAX_FILE_SIZE = 128;
Consumer<SourceFile> trackFiles = (SourceFile f) -> {};
Consumer<SourceFile> trackFiles = (SourceFile f) -> {
};
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
@ -82,11 +83,9 @@ public class FileWriterTest {
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
Assert.assertEquals(0, Objects.requireNonNull(folder.listFiles()).length);
HashMap<String, Long> files = new HashMap<>();
final int MAX_FILE_SIZE = 100;
Consumer<SourceFile> trackFiles = (SourceFile f) -> files.put(f.path, f.rawBytes);
@ -279,6 +278,7 @@ public class FileWriterTest {
protected Map<String, String> getProperties() {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "xxx");
settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, "xxx");
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");

Просмотреть файл

@ -13,8 +13,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class KustoSinkConnectorConfigTest {
private static final String ENGINE_URI = "https://cluster_name.kusto.windows.net";
private static final String DM_URI = "https://ingest-cluster_name.kusto.windows.net";
private static final String ENGINE_URI = "https://cluster_name.kusto.windows.net";
@Test
public void shouldAcceptValidConfig() {
@ -42,15 +42,6 @@ public class KustoSinkConnectorConfigTest {
new KustoSinkConfig(settings);
}
@Test
public void shouldGuessKustoEngineUrlWhenNotGivenPrivateCase() {
HashMap<String, String> settings = setupConfigs();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, "https://private-ingest-cluster_name.kusto.windows.net");
KustoSinkConfig config = new KustoSinkConfig(settings);
String kustoEngineUrl = config.getKustoEngineUrl();
assertEquals("https://private-cluster_name.kusto.windows.net", kustoEngineUrl);
}
@Test
public void shouldUseDmUrlWhenKustoEngineUrlNotGivenAndCantGuess() {
HashMap<String, String> settings = setupConfigs();
@ -118,6 +109,7 @@ public class KustoSinkConnectorConfigTest {
public static HashMap<String, String> setupConfigs() {
HashMap<String, String> configs = new HashMap<>();
configs.put(KustoSinkConfig.KUSTO_URL_CONF, DM_URI);
configs.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, ENGINE_URI);
configs.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
configs.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
configs.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");

Просмотреть файл

@ -11,10 +11,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.junit.Assert;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -22,18 +22,15 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.*;
import static org.mockito.Mockito.*;
public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (FileWriter)...
private File currentDirectory;
private static final String KUSTO_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net";
private static final String KUSTO_INGEST_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net";
private static final String KUSTO_CLUSTER_URL = "https://cluster.kusto.windows.net";
private static final String DATABASE = "testdb1";
private static final String TABLE = "testtable1";
private boolean isDlqEnabled;
@ -76,13 +73,13 @@ public class TopicPartitionWriterTest {
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
SourceFile descriptor = new SourceFile();
descriptor.rawBytes = 1024;
descriptor.path = "somepath/somefile";
descriptor.file = new File ("C://myfile.txt");
descriptor.file = new File("C://myfile.txt");
writer.handleRollFile(descriptor);
ArgumentCaptor<FileSourceInfo> fileSourceInfoArgument = ArgumentCaptor.forClass(FileSourceInfo.class);
@ -96,7 +93,7 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(fileSourceInfoArgument.getValue().getFilePath(), descriptor.path);
Assert.assertEquals(TABLE, ingestionPropertiesArgumentCaptor.getValue().getTableName());
Assert.assertEquals(DATABASE, ingestionPropertiesArgumentCaptor.getValue().getDatabaseName());
Assert.assertEquals(fileSourceInfoArgument.getValue().getRawSizeInBytes(), 1024);
Assert.assertEquals(1024, fileSourceInfoArgument.getValue().getRawSizeInBytes());
}
@Test
@ -111,7 +108,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
Assert.assertEquals(writer.getFilePath(null), Paths.get(config.getTempDirPath(), "kafka_testTopic_11_0.csv.gz").toString());
@ -128,7 +125,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
List<SinkRecord> records = new ArrayList<>();
@ -155,7 +152,7 @@ public class TopicPartitionWriterTest {
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
writer.close();
@ -188,7 +185,7 @@ public class TopicPartitionWriterTest {
}
@Test
public void testWriteStringyValuesAndOffset() throws Exception {
public void testWriteStringyValuesAndOffset() {
TopicPartition tp = new TopicPartition("testTopic", 2);
IngestClient mockClient = mock(IngestClient.class);
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
@ -199,11 +196,11 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
List<SinkRecord> records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, "another,stringy,message", 3));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, "{'also':'stringy','sortof':'message'}", 4));
@ -221,7 +218,7 @@ public class TopicPartitionWriterTest {
TopicPartition tp = new TopicPartition("testPartition", 11);
IngestClient mockClient = mock(IngestClient.class);
String basePath = Paths.get(currentDirectory.getPath(), "testWriteStringyValuesAndOffset").toString();
String[] messages = new String[]{ "stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
String[] messages = new String[]{"stringy message", "another,stringy,message", "{'also':'stringy','sortof':'message'}"};
// Expect to finish file after writing forth message cause of fileThreshold
long fileThreshold = messages[0].length() + messages[1].length() + messages[2].length() + messages[2].length() - 1;
@ -230,11 +227,11 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv);
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
List<SinkRecord> records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[0], 10));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[1], 13));
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.STRING_SCHEMA, messages[2], 14));
@ -245,8 +242,8 @@ public class TopicPartitionWriterTest {
writer.writeRecord(record);
}
Assert.assertEquals((long) writer.lastCommittedOffset, (long) 15);
Assert.assertEquals(writer.currentOffset, 16);
Assert.assertEquals(15, (long) writer.lastCommittedOffset);
Assert.assertEquals(16, writer.currentOffset);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(config.getTempDirPath(), String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 15, IngestionProperties.DATA_FORMAT.csv.name())).toString());
@ -277,19 +274,19 @@ public class TopicPartitionWriterTest {
props.ingestionProperties = new IngestionProperties(DATABASE, TABLE);
props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro);
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
List<SinkRecord> records = new ArrayList<SinkRecord>();
List<SinkRecord> records = new ArrayList<>();
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, o.toByteArray(), 10));
for (SinkRecord record : records) {
writer.writeRecord(record);
}
Assert.assertEquals((long) writer.lastCommittedOffset, (long) 10);
Assert.assertEquals(writer.currentOffset, 10);
Assert.assertEquals(10, (long) writer.lastCommittedOffset);
Assert.assertEquals(10, writer.currentOffset);
String currentFileName = writer.fileWriter.currentFile.path;
@ -300,7 +297,8 @@ public class TopicPartitionWriterTest {
private Map<String, String> getKustoConfigs(String basePath, long fileThreshold,
long flushInterval) {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_INGEST_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");