This commit is contained in:
hasher 2020-06-21 15:42:40 +05:30
Родитель ea6417f7a9
Коммит 696f63f37c
15 изменённых файлов: 49 добавлений и 353 удалений

63
pom.xml
Просмотреть файл

@ -69,7 +69,6 @@
</developers>
<properties>
<kafka.version>1.0.0</kafka.version>
<kafka.version.test>5.3.0-ccs</kafka.version.test>
<kafka.scala.version>2.12</kafka.scala.version>
<json.version>20090211</json.version>
<commonio.version>2.6</commonio.version>
@ -156,66 +155,6 @@
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version.test}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.version.test}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version.test}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.version.test}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.version.test}</version>
<type>test-jar</type>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version.test}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version.test}</version>
<type>test-jar</type>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<!-- Also include the test artifacts for Kafka clients for additional test utilities -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version.test}</version>
<classifier>test</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
@ -229,7 +168,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>${kafka.version.test}</version>
<version>5.3.0-ccs</version>
<scope>compile</scope>
</dependency>
</dependencies>

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.AvroRecordWriterProvider;
@ -54,6 +55,7 @@ public class FileWriter implements Closeable {
private RecordWriter recordWriter;
private File file;
private KustoSinkConfig config;
private final IngestionProperties ingestionProps;
/**
* @param basePath - This is path to which to write the files to.
@ -69,7 +71,8 @@ public class FileWriter implements Closeable {
long flushInterval,
boolean shouldCompressData,
ReentrantReadWriteLock reentrantLock,
KustoSinkConfig config) {
KustoSinkConfig config,
IngestionProperties ingestionProps) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
@ -82,6 +85,7 @@ public class FileWriter implements Closeable {
// If we failed on flush we want to throw the error from the put() flow.
flushError = null;
this.ingestionProps = ingestionProps;
}
@ -110,7 +114,7 @@ public class FileWriter implements Closeable {
currentFile = fileProps;
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
recordWriter = recordWriterProvider.getRecordWriter(config,currentFile.path,outputStream);
recordWriter = recordWriterProvider.getRecordWriter(config, currentFile.path, outputStream);
}
void rotate(@Nullable Long offset) throws IOException {
@ -243,7 +247,11 @@ public class FileWriter implements Closeable {
recordWriterProvider = new JsonRecordWriterProvider();
}
else if (record.valueSchema().type() == Schema.Type.STRUCT) {
recordWriterProvider = new AvroRecordWriterProvider();
if (ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.json.toString())) {
recordWriterProvider = new JsonRecordWriterProvider();
} else {
recordWriterProvider = new AvroRecordWriterProvider();
}
}
else if (record.valueSchema().type() == Schema.Type.STRING){
recordWriterProvider = new StringRecordWriterProvider();

Просмотреть файл

@ -107,15 +107,6 @@ public class KustoSinkConfig extends AbstractConfig {
+ "the Connector makes to ingest records into KustoDB.";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY = "Retry BackOff Time";
private static final String AVRO_CODEC_CONFIG = "avro.codec";
private static final String AVRO_CODEC_DEFAULT = "null";
private static final String AVRO_CODEC_DISPLAY = "Avro Compression Codec";
private static final String AVRO_CODEC_DOC = "The Avro compression codec to be used for output "
+ "files. Available values: null, deflate, snappy and bzip2 (CodecSource is org.apache"
+ ".avro.file.CodecFactory)";
private static final String[] AVRO_SUPPORTED_CODECS = new String[]{"null", "deflate", "snappy",
"bzip2"};
public KustoSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}
@ -290,19 +281,7 @@ public class KustoSinkConfig extends AbstractConfig {
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY)
.define(
AVRO_CODEC_CONFIG,
Type.STRING,
AVRO_CODEC_DEFAULT,
ConfigDef.ValidString.in(AVRO_SUPPORTED_CODECS),
Importance.LOW,
AVRO_CODEC_DOC,
formatGroupName,
formatGroupNameOrder,
ConfigDef.Width.MEDIUM,
AVRO_CODEC_DISPLAY
);
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY);
}
public String getKustoUrl() {
@ -365,10 +344,6 @@ public class KustoSinkConfig extends AbstractConfig {
return this.getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF);
}
public String getAvroCodec() {
return getString(AVRO_CODEC_CONFIG);
}
public static void main(String[] args) {
System.out.println(getConfig().toEnrichedRst());
}

Просмотреть файл

@ -1,14 +1,11 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

Просмотреть файл

@ -109,7 +109,7 @@ class TopicPartitionWriter {
try {
reentrantReadWriteLock.readLock().lock();
this.currentOffset = record.kafkaOffset();
fileWriter.writeData(record,record.kafkaOffset());
fileWriter.writeData(record, record.kafkaOffset());
} catch (ConnectException ex) {
if (commitImmediately) {
throw ex;
@ -136,7 +136,8 @@ class TopicPartitionWriter {
flushInterval,
shouldCompressData,
reentrantReadWriteLock,
config);
config,
ingestionProps);
}
void close() {

Просмотреть файл

@ -5,7 +5,6 @@ import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import io.confluent.connect.avro.AvroData;
import io.confluent.kafka.serializers.NonRecordContainer;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.kafka.connect.data.Schema;
@ -37,7 +36,6 @@ public class AvroRecordWriterProvider implements RecordWriterProvider<KustoSinkC
log.info("Opening record writer for: {}", filename);
org.apache.avro.Schema avroSchema = avroData.fromConnectSchema(schema);
writer.setFlushOnEveryBlock(true);
writer.setCodec(CodecFactory.fromString(conf.getAvroCodec()));
writer.create(avroSchema, out);
} catch (IOException e) {
throw new ConnectException(e);

Просмотреть файл

@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
@ -43,11 +44,22 @@ public class JsonRecordWriterProvider implements RecordWriterProvider<KustoSinkC
long size =0;
@Override
public void write(SinkRecord record) {
log.info("Opening record writer for: {}", filename);
log.trace("Sink record: {}", record);
try {
Object value = record.value();
writer.writeObject(value);
writer.writeRaw(LINE_SEPARATOR);
if (value instanceof Struct) {
byte[] rawJson = converter.fromConnectData(
record.topic(),
record.valueSchema(),
value
);
out.write(rawJson);
out.write(LINE_SEPARATOR_BYTES);
} else {
writer.writeObject(value);
writer.writeRaw(LINE_SEPARATOR);
}
size+= (value.toString().getBytes().length + LINE_SEPARATOR.getBytes().length);
} catch (IOException e) {
throw new ConnectException(e);

Просмотреть файл

@ -24,16 +24,17 @@ import java.util.logging.Logger;
public class E2ETest {
private static final String testPrefix = "tmpKafkaE2ETest";
private String appId = System.getProperty("appId","xxx");
private String appKey = System.getProperty("appKey","xxx");
private String authority = System.getProperty("authority","xxxx");
private String cluster = System.getProperty("cluster","xxxx");
private String database = System.getProperty("database","xxxx");
private String appId = System.getProperty("appId","e299f8c0-4965-4b1f-9b55-a56dd4b8f6c4");
private String appKey = System.getProperty("appKey",".1t-fP8o~a0b8X7-o0.wCr6.0aPBR7~k9L");
private String authority = System.getProperty("authority","7f66d0ea-6137-4e37-a835-4530eba9b3ee");
private String cluster = System.getProperty("cluster","azureconnector.centralus");
private String database = System.getProperty("database","anmol");
private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
private Logger log = Logger.getLogger(this.getClass().getName());
@Test
@Ignore
public void testE2ECsv() throws URISyntaxException, DataClientException, DataServiceException {
String table = tableBaseName + "csv";
Map<String, String> properties = getProperties();
@ -90,7 +91,6 @@ public class E2ETest {
}
@Test
// @Ignore
public void testE2EAvro() throws URISyntaxException, DataClientException, DataServiceException {
String table = tableBaseName + "avro";
Map<String, String> properties = getProperties();

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.connect.data.Schema;
@ -22,6 +23,7 @@ import java.util.zip.GZIPOutputStream;
public class FileWriterTest {
private File currentDirectory;
private KustoSinkConfig config;
IngestionProperties ingestionProps;
@Before
public final void before() {
@ -31,6 +33,7 @@ public class FileWriterTest {
FileWriter.class.getSimpleName(),
String.valueOf(Instant.now().toEpochMilli())
).toString());
ingestionProps = new IngestionProperties("db", "table");
}
@After
@ -59,7 +62,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(),config);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), config, ingestionProps);
String msg = "Line number 1: This is a message from the other size";
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
fileWriter.initializeRecordWriter(record);
@ -91,7 +94,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(),config);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), config, ingestionProps);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
@ -132,7 +135,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), config);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), config, ingestionProps);
String msg = "Message";
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
@ -150,7 +153,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock(), config);
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock(), config, ingestionProps);
String msg2 = "Second Message";
SinkRecord record1 = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg2.getBytes(), 10);
@ -198,7 +201,7 @@ public class FileWriterTest {
}
return Paths.get(path, Long.toString(offset)).toString();
};
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock, config);
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock, config, ingestionProps);
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
@ -256,7 +259,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), config);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), config, ingestionProps);
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();

Просмотреть файл

@ -92,3 +92,4 @@ public class AvroRecordWriterTest {
return props;
}
}

Просмотреть файл

@ -61,3 +61,4 @@ public class ByteArrayWriterProviderTest {
return props;
}
}

Просмотреть файл

@ -62,3 +62,4 @@ public class JsonRecordWriterProviderTest {
return props;
}
}

Просмотреть файл

@ -60,4 +60,4 @@ public class StringRecordWriterProviderTest {
props.put("kusto.tables.topics.mapping", "[{'topic': 'xxx','db': 'xxx', 'table': 'xxx','format': 'avro', 'mapping':'avri'}]");
return props;
}
}
}

Просмотреть файл

@ -1,145 +0,0 @@
package com.microsoft.azure.kusto.kafka.connect.sink.integeration;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.*;
public class AzureKustoConnectorIT extends BaseConnectorIT {
private static final Logger log = LoggerFactory.getLogger(AzureKustoConnectorIT.class);
private static final List<String> KAFKA_TOPICS = Arrays.asList("kafka1");
private static final int NUM_OF_PARTITION = 5;
private static final int NUM_RECORDS_PRODUCED_PER_PARTITION = 100;
private static final String CONNECTOR_NAME = "azure-kusto-connector";
private String appId = System.getProperty("appId","xxxx");
private String appKey = System.getProperty("appKey","xxxxx");
private String authority = System.getProperty("authority","xxxx");
private String cluster = System.getProperty("cluster","xxxx");
private String database = System.getProperty("database","xxxxx");
Client engineClient;
String table;
@Before
public void setup() throws Exception {
startConnect();
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority);
engineClient = ClientFactory.createClient(engineCsb);
}
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
@After
public void close() {
stopConnect();
}
@Test
public void testWithCsvData() throws DataClientException, DataServiceException, IOException, InterruptedException {
table = "CsvTable";
Map<String, String> props = getProperties();
props.put("tempdir.path",Paths.get(basePath, "csv").toString());
props.put("value.converter","org.apache.kafka.connect.storage.StringConverter");
props.put("key.converter","org.apache.kafka.connect.storage.StringConverter");
connect.kafka().createTopic(KAFKA_TOPICS.get(0), NUM_OF_PARTITION);
produceCsvRecords();
engineClient.execute(database, String.format(".create table %s (ColA:string,ColB:int)", table));
engineClient.execute(database, String.format(".create table ['%s'] ingestion csv mapping 'mappy' " +
"'[" +
"{\"column\":\"ColA\", \"DataType\":\"string\", \"Properties\":{\"transform\":\"SourceLocation\"}}," +
"{\"column\":\"ColB\", \"DataType\":\"int\", \"Properties\":{\"Ordinal\":\"1\"}}," +
"]'", table));
// start a sink connector
props.put("kusto.tables.topics.mapping", "[{'topic': 'kafka1','db':'" + database + "', 'table': '" + table + "','format': 'csv', 'mapping':'mappy'}]");
connect.configureConnector(CONNECTOR_NAME, props);
// wait for tasks to spin up
waitForConnectorToStart(CONNECTOR_NAME, 1);
log.error("Waiting for records in destination topic ...");
validateExpectedResults(NUM_RECORDS_PRODUCED_PER_PARTITION * NUM_OF_PARTITION);
engineClient.execute(database, ".drop table " + table);
}
@Test
public void testWithJsonData() throws Exception {
table = "JsonTable";
connect.kafka().createTopic(KAFKA_TOPICS.get(0), NUM_OF_PARTITION);
produceJsonRecords();
engineClient.execute(database, String.format(".create table %s (TimeStamp: datetime, Name: string, Metric: int, Source:string)",table));
engineClient.execute(database, String.format(".create table %s ingestion json mapping 'jsonMapping' '[{\"column\":\"TimeStamp\",\"path\":\"$.timeStamp\",\"datatype\":\"datetime\"},{\"column\":\"Name\",\"path\":\"$.name\",\"datatype\":\"string\"},{\"column\":\"Metric\",\"path\":\"$.metric\",\"datatype\":\"int\"},{\"column\":\"Source\",\"path\":\"$.source\",\"datatype\":\"string\"}]'",table));
Map<String, String> props = getProperties();
props.put("tempdir.path",Paths.get(basePath, "json").toString());
props.put("kusto.tables.topics.mapping","[{'topic': 'kafka1','db': 'anmol', 'table': '"+ table +"','format': 'json', 'mapping':'jsonMapping'}]");
props.put("value.converter","org.apache.kafka.connect.json.JsonConverter");
props.put("key.converter","org.apache.kafka.connect.json.JsonConverter");
props.put("value.converter.schemas.enable","false");
props.put("key.converter.schemas.enable","false");
props.put("max.retries","0");
// start a sink connector
connect.configureConnector(CONNECTOR_NAME, props);
// wait for tasks to spin up
waitForConnectorToStart(CONNECTOR_NAME, 1);
log.error("Waiting for records in destination topic ...");
validateExpectedResults(NUM_RECORDS_PRODUCED_PER_PARTITION * NUM_OF_PARTITION);
engineClient.execute(database, ".drop table " + table);
}
private void produceCsvRecords(){
for(int i = 0; i<NUM_OF_PARTITION;i++){
for (int j = 0; j < NUM_RECORDS_PRODUCED_PER_PARTITION; j++) {
String kafkaTopic = KAFKA_TOPICS.get(j % KAFKA_TOPICS.size());
log.debug("Sending message {} with topic {} to Kafka broker {}", kafkaTopic);
connect.kafka().produce(kafkaTopic,i, null,String.format("stringy message,%s",i));
}
}
}
private void produceJsonRecords(){
for(int i = 0; i<NUM_OF_PARTITION;i++){
for (int j = 0; j < NUM_RECORDS_PRODUCED_PER_PARTITION; j++) {
String kafkaTopic = KAFKA_TOPICS.get(j % KAFKA_TOPICS.size());
log.debug("Sending message {} with topic {} to Kafka broker {}", kafkaTopic);
connect.kafka().produce(kafkaTopic,i, null,String.format("{\"timestamp\" : \"2017-07-23 13:10:11\",\"name\" : \"Anmol\",\"metric\" : \"%s\",\"source\" : \"Demo\"}",i));
}
}
}
private void validateExpectedResults(Integer expectedNumberOfRows) throws InterruptedException, DataClientException, DataServiceException {
String query = String.format("%s | count", table);
KustoResultSetTable res = engineClient.execute(database, query).getPrimaryResults();
res.next();
Integer timeoutMs = 60 * 9 * 1000;
Integer rowCount = res.getInt(0);
Integer timeElapsedMs = 0;
Integer sleepPeriodMs = 5 * 1000;
while (rowCount < expectedNumberOfRows && timeElapsedMs < timeoutMs) {
Thread.sleep(sleepPeriodMs);
res = engineClient.execute(database, query).getPrimaryResults();
res.next();
rowCount = res.getInt(0);
timeElapsedMs += sleepPeriodMs;
}
Assertions.assertEquals(rowCount, expectedNumberOfRows);
this.log.info("Succesfully ingested " + expectedNumberOfRows + " records.");
}
}

Просмотреть файл

@ -1,95 +0,0 @@
package com.microsoft.azure.kusto.kafka.connect.sink.integeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category(IntegrationTest.class)
public abstract class BaseConnectorIT {
private static final Logger log = LoggerFactory.getLogger(BaseConnectorIT.class);
protected static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(45);
protected static final long CONNECTOR_STARTUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
protected static final int TASKS_MAX = 1;
protected EmbeddedConnectCluster connect;
protected void startConnect() throws Exception {
connect = new EmbeddedConnectCluster.Builder()
.name("my-connect-cluster")
.build();
connect.start();
}
protected void stopConnect() {
try {
connect.stop();
} catch (Exception ne) {
ne.printStackTrace();
}
}
/**
* Wait up to {@link #CONNECTOR_STARTUP_DURATION_MS maximum time limit} for the connector with the given
* name to start the specified number of tasks.
*
* @param name the name of the connector
* @param numTasks the minimum number of tasks that are expected
* @return the time this method discovered the connector has started, in milliseconds past epoch
* @throws InterruptedException if this was interrupted
*/
protected long waitForConnectorToStart(String name, int numTasks) throws InterruptedException {
TestUtils.waitForCondition(
() -> assertConnectorAndTasksRunning(name, numTasks).orElse(false),
CONNECTOR_STARTUP_DURATION_MS,
"Connector tasks did not start in time."
);
return System.currentTimeMillis();
}
/**
* Confirm that a connector with an exact number of tasks is running.
*
* @param connectorName the connector
* @param numTasks the expected number of tasks
* @return true if the connector and tasks are in RUNNING state; false otherwise
*/
protected Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
try {
ConnectorStateInfo info = connect.connectorStatus(connectorName);
boolean result = info != null
&& info.tasks().size() == numTasks
&& info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
&& info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
return Optional.of(result);
} catch (Exception e) {
log.warn("Could not check connector state info.");
return Optional.empty();
}
}
protected Map<String, String> getProperties() {
Map<String, String> props = new HashMap<>();
props.put("connector.class", "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector");
props.put("bootstrap.servers",connect.kafka().bootstrapServers());
props.put("topics","kafka1");
props.put("tasks.max","1");
props.put("kusto.url","xxxx");
props.put("kusto.auth.authority","xxxx");
props.put("kusto.auth.appid","xxxx");
props.put("kusto.auth.appkey","xxxx");
props.put("value.converter.schemas.enable","false");
props.put("key.converter.schemas.enable","false");
props.put("kusto.sink.flush_size", "10000");
props.put("value.converter","org.apache.kafka.connect.storage.StringConverter");
props.put("key.converter","org.apache.kafka.connect.storage.StringConverter");
return props;
}
}