* streaming

* streaming

* test

* 2.2.0

* 2.2.0

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
Co-authored-by: KustoIbizaExtension Build <kustodev@microsoft.com>
This commit is contained in:
ohad bitton 2021-09-13 15:42:22 +03:00 коммит произвёл GitHub
Родитель fe8e74ac2e
Коммит 6dbd611a0f
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 181 добавлений и 74 удалений

Просмотреть файл

@ -9,7 +9,7 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>2.1.0</version>
<version>2.2.0</version>
<properties>
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>

Просмотреть файл

@ -56,7 +56,7 @@ public class FileWriter implements Closeable {
private String flushError;
private RecordWriterProvider recordWriterProvider;
private RecordWriter recordWriter;
private final IngestionProperties ingestionProps;
private final IngestionProperties.DATA_FORMAT format;
private BehaviorOnError behaviorOnError;
private boolean shouldWriteAvroAsBytes = false;
@ -73,7 +73,7 @@ public class FileWriter implements Closeable {
Function<Long, String> getFilePath,
long flushInterval,
ReentrantReadWriteLock reentrantLock,
IngestionProperties ingestionProps,
IngestionProperties.DATA_FORMAT format,
BehaviorOnError behaviorOnError) {
this.getFilePath = getFilePath;
this.basePath = basePath;
@ -87,7 +87,7 @@ public class FileWriter implements Closeable {
// If we failed on flush we want to throw the error from the put() flow.
flushError = null;
this.ingestionProps = ingestionProps;
this.format = format;
}
@ -247,7 +247,6 @@ public class FileWriter implements Closeable {
resetFlushTimer(true);
}
recordWriter.write(record);
recordWriter.commit();
currentFile.records.add(record);
currentFile.rawBytes = countingStream.numBytes;
currentFile.numRecords++;
@ -262,14 +261,14 @@ public class FileWriter implements Closeable {
recordWriterProvider = new JsonRecordWriterProvider();
}
else if ((record.valueSchema() != null) && (record.valueSchema().type() == Schema.Type.STRUCT)) {
if (ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.json.toString())) {
if (format.equals(IngestionProperties.DATA_FORMAT.json)) {
recordWriterProvider = new JsonRecordWriterProvider();
} else if(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())) {
} else if(format.equals(IngestionProperties.DATA_FORMAT.avro)) {
recordWriterProvider = new AvroRecordWriterProvider();
} else {
throw new ConnectException(String.format("Invalid Kusto table mapping, Kafka records of type "
+ "Avro and JSON can only be ingested to Kusto table having Avro or JSON mapping. "
+ "Currently, it is of type %s.", ingestionProps.getDataFormat()));
+ "Currently, it is of type %s.", format));
}
}
else if ((record.valueSchema() == null) || (record.valueSchema().type() == Schema.Type.STRING)){
@ -277,7 +276,7 @@ public class FileWriter implements Closeable {
}
else if ((record.valueSchema() != null) && (record.valueSchema().type() == Schema.Type.BYTES)){
recordWriterProvider = new ByteRecordWriterProvider();
if(ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())) {
if(format.equals(IngestionProperties.DATA_FORMAT.avro)) {
shouldWriteAvroAsBytes = true;
}
} else {

Просмотреть файл

@ -58,8 +58,12 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_AUTH_AUTHORITY_DISPLAY = "Kusto Auth Authority";
static final String KUSTO_TABLES_MAPPING_CONF = "kusto.tables.topics.mapping";
private static final String KUSTO_TABLES_MAPPING_DOC = "Kusto target tables mapping (per topic mapping, "
+ "'topic1:table1;topic2:table2;').";
private static final String KUSTO_TABLES_MAPPING_DOC = "A JSON array mapping ingestion from topic to table, e.g: "
+ "[{'topic1':'t1','db':'kustoDb', 'table': 'table1', 'format': 'csv', 'mapping': 'csvMapping', 'streaming': 'false'}..].\n"
+ "Streaming is optional, defaults to false. Mind usage and cogs of streaming ingestion, read here: https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-streaming.\n"
+ "Note: If the streaming ingestion fails transiently,"
+ " queued ingest would apply for this specific batch ingestion. Batching latency is configured regularly via"
+ "ingestion batching policy";
private static final String KUSTO_TABLES_MAPPING_DISPLAY = "Kusto Table Topics Mapping";
static final String KUSTO_SINK_TEMP_DIR_CONF = "tempdir.path";
@ -74,7 +78,7 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_SINK_FLUSH_INTERVAL_MS_CONF = "flush.interval.ms";
private static final String KUSTO_SINK_FLUSH_INTERVAL_MS_DOC = "Kusto sink max staleness in milliseconds (per topic+partition combo).";
private static final String KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY = "Maximum Flush Interval";
static final String KUSTO_BEHAVIOR_ON_ERROR_CONF = "behavior.on.error";
private static final String KUSTO_BEHAVIOR_ON_ERROR_DOC = "Behavior on error setting for "
+ "ingestion of records into Kusto table. "
@ -239,7 +243,8 @@ public class KustoSinkConfig extends AbstractConfig {
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY);
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY
);
}
private static void defineConnectionConfigs(ConfigDef result) {
@ -299,7 +304,7 @@ public class KustoSinkConfig extends AbstractConfig {
KUSTO_AUTH_AUTHORITY_DISPLAY);
}
public String getKustoUrl() {
public String getKustoIngestUrl() {
return this.getString(KUSTO_INGEST_URL_CONF);
}
@ -377,7 +382,7 @@ public class KustoSinkConfig extends AbstractConfig {
public long getRetryBackOffTimeMs() {
return this.getLong(KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF);
}
public static void main(String[] args) {
System.out.println(getConfig().toEnrichedRst());
}

Просмотреть файл

@ -5,6 +5,7 @@ import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.data.exceptions.KustoDataException;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
@ -38,33 +39,49 @@ public class KustoSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
public static final String FETCH_TABLE_QUERY = "%s | count";
public static final String FETCH_TABLE_MAPPING_QUERY = ".show table %s ingestion %s mapping '%s'";
public static final String FETCH_PRINCIPAL_ROLES_QUERY = ".show principal access with (principal = '%s', accesstype='ingest',database='%s',table='%s')";
public static final String FETCH_TABLE_COMMAND = "%s | count";
public static final String FETCH_TABLE_MAPPING_COMMAND = ".show table %s ingestion %s mapping '%s'";
public static final String FETCH_PRINCIPAL_ROLES_COMMAND = ".show principal access with (principal = '%s', accesstype='ingest',database='%s',table='%s')";
public static final String STREAMING_POLICY_SHOW_COMMAND = ".show %s %s policy streamingingestion";
public static final int INGESTION_ALLOWED_INDEX = 3;
public static final String MAPPING = "mapping";
public static final String MAPPING_FORMAT = "format";
public static final String MAPPING_TABLE = "table";
public static final String DATABASE = "database";
public static final String MAPPING_DB = "db";
public static final String VALIDATION_OK = "OK";
public static final String STREAMING = "streaming";
private final Set<TopicPartition> assignment;
private Map<String, TopicIngestionProperties> topicsToIngestionProps;
private KustoSinkConfig config;
protected IngestClient kustoIngestClient;
protected IngestClient streamingIngestClient;
protected Map<TopicPartition, TopicPartitionWriter> writers;
private boolean isDlqEnabled;
private String dlqTopicName;
private Producer<byte[], byte[]> dlqProducer;
private static final ClientRequestProperties clientRequestProperties = new ClientRequestProperties();
private static final ClientRequestProperties validateOnlyClientRequestProperties = new ClientRequestProperties();
public KustoSinkTask() {
assignment = new HashSet<>();
writers = new HashMap<>();
clientRequestProperties.setOption("validate_permissions", true);
validateOnlyClientRequestProperties.setOption("validate_permissions", true);
// TODO we should check ingestor role differently
}
public static IngestClient createKustoIngestClient(KustoSinkConfig config) {
private static boolean isStreamingEnabled(KustoSinkConfig config) throws JSONException {
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
if (mapping.optBoolean(STREAMING)) {
return true;
}
}
return false;
}
public void createKustoIngestClient(KustoSinkConfig config) {
try {
if (!Strings.isNullOrEmpty(config.getAuthAppid())) {
if (Strings.isNullOrEmpty(config.getAuthAppkey())) {
@ -72,14 +89,24 @@ public class KustoSinkTask extends SinkTask {
}
ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
config.getKustoUrl(),
config.getKustoIngestUrl(),
config.getAuthAppid(),
config.getAuthAppkey(),
config.getAuthAuthority()
);
kcsb.setClientVersionForTracing(Version.CLIENT_NAME + ":" + Version.getVersion());
if (isStreamingEnabled(config)){
ConnectionStringBuilder engineKcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
config.getKustoEngineUrl(),
config.getAuthAppid(),
config.getAuthAppkey(),
config.getAuthAuthority()
);
streamingIngestClient = IngestClientFactory.createManagedStreamingIngestClient(kcsb, engineKcsb);
}
return IngestClientFactory.createClient(kcsb);
kustoIngestClient = IngestClientFactory.createClient(kcsb);
return;
}
throw new ConfigException("Failed to initialize KustoIngestClient, please " +
@ -129,6 +156,7 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString(MAPPING_TABLE);
String format = mapping.optString(MAPPING_FORMAT);
boolean streaming = mapping.optBoolean(STREAMING);
IngestionProperties props = new IngestionProperties(db, table);
@ -155,6 +183,7 @@ public class KustoSinkTask extends SinkTask {
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.ingestionProperties = props;
topicIngestionProperties.streaming = streaming;
result.put(mapping.getString("topic"), topicIngestionProperties);
}
return result;
@ -206,7 +235,7 @@ public class KustoSinkTask extends SinkTask {
String database = testMapping.getString(MAPPING_DB);
String table = testMapping.getString(MAPPING_TABLE);
try {
engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table), clientRequestProperties);
engineClient.execute(database, String.format(FETCH_TABLE_COMMAND, table), validateOnlyClientRequestProperties);
} catch (DataServiceException | DataClientException err) {
if (err.getCause().getMessage().contains("Forbidden:")) {
log.warn("User might have ingestor privileges, table validation will be skipped for all table mappings ");
@ -235,13 +264,19 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString(MAPPING_TABLE);
String format = mapping.getString(MAPPING_FORMAT);
String mappingName = mapping.getString(MAPPING);
boolean streamingEnabled = mapping.optBoolean(STREAMING);
if (isDataFormatAnyTypeOfJson(format)) {
format = IngestionProperties.DATA_FORMAT.json.name();
}
boolean hasAccess = false;
boolean shouldCheckStreaming = streamingEnabled;
try {
if (shouldCheckStreaming && isStreamingPolicyEnabled(DATABASE, database, engineClient, database)){
shouldCheckStreaming = false;
}
try {
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table), clientRequestProperties);
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_COMMAND, table), validateOnlyClientRequestProperties);
if (VALIDATION_OK.equals(rs.getPrimaryResults().getData().get(0).get(0))) {
hasAccess = true;
}
@ -250,20 +285,20 @@ public class KustoSinkTask extends SinkTask {
}
if (hasAccess) {
try {
engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_QUERY, table, format, mappingName));
engineClient.execute(database, String.format(FETCH_TABLE_MAPPING_COMMAND, table, format, mappingName));
} catch (DataServiceException e) {
hasAccess = false;
databaseTableErrorList.add(String.format("Database:%s Table:%s | %s mapping '%s' not found, with exception '%s'", database, table, format, mappingName, ExceptionUtils.getStackTrace(e)));
}
}
if (hasAccess) {
String authenticateWith = "aadapp=" + config.getAuthAppid();
String query = String.format(FETCH_PRINCIPAL_ROLES_QUERY, authenticateWith, database, table);
String authenticateWith = String.format("aadapp=%s;%s", config.getAuthAppid(), config.getAuthAuthority());
String query = String.format(FETCH_PRINCIPAL_ROLES_COMMAND, authenticateWith, database, table);
try {
KustoOperationResult rs = engineClient.execute(database, query);
hasAccess = (boolean) rs.getPrimaryResults().getData().get(0).get(INGESTION_ALLOWED_INDEX);
if (hasAccess) {
log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
} else {
accessErrorList.add(String.format("User does not have appropriate permissions " +
"to sink data into the Kusto database %s", database));
@ -277,7 +312,12 @@ public class KustoSinkTask extends SinkTask {
}
}
}
} catch (DataClientException e) {
if (hasAccess && shouldCheckStreaming && !isStreamingPolicyEnabled(MAPPING_TABLE, table, engineClient, database)) {
databaseTableErrorList.add(String.format("Ingestion is configured as streaming, but a streaming" +
" ingestion policy was not found on either database '%s' or table '%s'", database, table));
}
} catch (KustoDataException e) {
throw new ConnectException("Unable to connect to ADX(Kusto) instance", e);
}
}
@ -297,7 +337,8 @@ public class KustoSinkTask extends SinkTask {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped " +
"for the topic: %s. please check your configuration.", tp.topic()));
} else {
TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient, ingestionProps, config, isDlqEnabled, dlqTopicName, dlqProducer);
IngestClient client = ingestionProps.streaming ? streamingIngestClient : kustoIngestClient;
TopicPartitionWriter writer = new TopicPartitionWriter(tp, client, ingestionProps, config, isDlqEnabled, dlqTopicName, dlqProducer);
writer.open();
writers.put(tp, writer);
}
@ -320,7 +361,7 @@ public class KustoSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
config = new KustoSinkConfig(props);
String url = config.getKustoUrl();
String url = config.getKustoIngestUrl();
validateTableMappings(config);
if (config.isDlqEnabled()) {
@ -343,7 +384,7 @@ public class KustoSinkTask extends SinkTask {
topicsToIngestionProps = getTopicsToIngestionProps(config);
// this should be read properly from settings
kustoIngestClient = createKustoIngestClient(config);
createKustoIngestClient(config);
log.info("Started KustoSinkTask with target cluster: ({}), source topics: ({})", url, topicsToIngestionProps.keySet());
// Adding this check to make code testable
@ -352,6 +393,14 @@ public class KustoSinkTask extends SinkTask {
}
}
private static boolean isStreamingPolicyEnabled (
String entityType, String entityName, Client engineClient, String database
) throws DataClientException, DataServiceException {
KustoResultSetTable res = engineClient.execute(database, String.format(STREAMING_POLICY_SHOW_COMMAND, entityType, entityName)).getPrimaryResults();
res.next();
return res.getString("Policy") != null;
}
@Override
public void stop() {
log.warn("Stopping KustoSinkTask");

Просмотреть файл

@ -3,6 +3,6 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
class TopicIngestionProperties {
IngestionProperties ingestionProperties;
boolean streaming;
}

Просмотреть файл

@ -1,17 +1,21 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Strings;
import com.microsoft.azure.kusto.data.exceptions.KustoDataException;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.result.*;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.storage.StorageException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -22,8 +26,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
@ -38,7 +42,7 @@ class TopicPartitionWriter {
private final TopicPartition tp;
private final IngestClient client;
private final IngestionProperties ingestionProps;
private final TopicIngestionProperties ingestionProps;
private final String basePath;
private final long flushInterval;
private final long fileThreshold;
@ -58,7 +62,7 @@ class TopicPartitionWriter {
{
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps.ingestionProperties;
this.ingestionProps = ingestionProps;
this.fileThreshold = config.getFlushSizeBytes();
this.basePath = getTempDirectoryName(config.getTempDirPath());
this.flushInterval = config.getFlushInterval();
@ -70,7 +74,6 @@ class TopicPartitionWriter {
this.isDlqEnabled = isDlqEnabled;
this.dlqTopicName = dlqTopicName;
this.dlqProducer = dlqProducer;
}
public void handleRollFile(SourceFile fileDescriptor) {
@ -87,22 +90,64 @@ class TopicPartitionWriter {
*/
for (int retryAttempts = 0; true; retryAttempts++) {
try {
client.ingestFromFile(fileSourceInfo, ingestionProps);
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
if (ingestionProps.streaming && ingestionResult instanceof IngestionStatusResult) {
// If IngestionStatusResult returned then the ingestion status is from streaming ingest
IngestionStatus ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
if (!hasStreamingSucceeded(ingestionStatus)) {
retryAttempts += ManagedStreamingIngestClient.MAX_RETRY_CALLS;
backOffForRemainingAttempts(retryAttempts, null, fileDescriptor);
continue;
}
}
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
this.lastCommittedOffset = currentOffset;
return;
} catch (IngestionServiceException exception) {
} catch (IngestionServiceException | StorageException exception) {
if (ingestionProps.streaming && exception instanceof IngestionServiceException){
Throwable innerException = exception.getCause();
if (innerException instanceof KustoDataException &&
((KustoDataException) innerException).isPermanent()){
throw new ConnectException(exception);
}
}
// TODO : improve handling of specific transient exceptions once the client supports them.
// retrying transient exceptions
backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
} catch (IngestionClientException exception) {
} catch (IngestionClientException | URISyntaxException exception) {
throw new ConnectException(exception);
}
}
}
private void backOffForRemainingAttempts(int retryAttempts, Exception e, SourceFile fileDescriptor) {
private boolean hasStreamingSucceeded(IngestionStatus status) throws URISyntaxException, StorageException {
switch (status.status){
case Succeeded:
case Queued:
case Pending:
return true;
case Skipped:
case PartiallySucceeded:
String failureStatus = status.getFailureStatus();
String details = status.getDetails();
UUID ingestionSourceId = status.getIngestionSourceId();
log.warn("A batch of streaming records has {} ingestion: table:{}, database:{}, operationId: {}," +
"ingestionSourceId: {}{}{}.\n" +
"Status is final and therefore ingestion won't be retried and data won't reach dlq",
status.getStatus(),
status.getTable(),
status.getDatabase(),
status.getOperationId(),
ingestionSourceId,
(!Strings.isNullOrEmpty(failureStatus) ? (", failure: " + failureStatus) : ""),
(!Strings.isNullOrEmpty(details) ? (", details: " + details) : ""));
return true;
case Failed:
}
return false;
}
private void backOffForRemainingAttempts(int retryAttempts, Exception exce, SourceFile fileDescriptor) {
if (retryAttempts < maxRetryAttempts) {
// RetryUtil can be deleted if exponential backOff is not required, currently using constant backOff.
// long sleepTimeMs = RetryUtil.computeExponentialBackOffWithJitter(retryAttempts, TimeUnit.SECONDS.toMillis(5));
@ -115,14 +160,14 @@ class TopicPartitionWriter {
log.warn("Writing {} failed records to miscellaneous dead-letter queue topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e);
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), exce);
}
} else {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to miscellaneous dead-letter queue topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e);
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", exce);
}
}
@ -153,7 +198,7 @@ class TopicPartitionWriter {
offset = offset == null ? currentOffset : offset;
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), COMPRESSION_EXTENSION)).toString();
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.ingestionProperties.getDataFormat(), COMPRESSION_EXTENSION)).toString();
}
void writeRecord(SinkRecord record) throws ConnectException {
@ -193,7 +238,7 @@ class TopicPartitionWriter {
this::getFilePath,
flushInterval,
reentrantReadWriteLock,
ingestionProps,
IngestionProperties.DATA_FORMAT.valueOf(ingestionProps.ingestionProperties.getDataFormat()),
behaviorOnError);
}

Просмотреть файл

@ -15,10 +15,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import java.io.FileInputStream;
import java.io.IOException;
@ -65,7 +62,7 @@ public class E2ETest {
messagesBytes.add(messages[1].getBytes());
long flushInterval = 100;
if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval)) {
if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval, false)) {
Assertions.fail("Test failed");
}
}
@ -82,7 +79,7 @@ public class E2ETest {
messagesBytes.add(messages[1].getBytes());
long flushInterval = 100;
if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval)) {
if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval, false)) {
Assertions.fail("Test failed");
}
}
@ -106,12 +103,14 @@ public class E2ETest {
messagesBytes.add(message);
long flushInterval = 300000;
if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval)) {
if (!executeTest(dataFormat, ingestionMappingKind, mapping, messagesBytes, flushInterval, true)) {
Assertions.fail("Test failed");
}
}
private boolean executeTest(String dataFormat, IngestionMapping.IngestionMappingKind ingestionMappingKind, String mapping, List<byte[]> messagesBytes, long flushInterval) throws URISyntaxException, DataServiceException, DataClientException {
private boolean executeTest(String dataFormat, IngestionMapping.IngestionMappingKind ingestionMappingKind, String mapping,
List<byte[]> messagesBytes, long flushInterval, boolean streaming)
throws URISyntaxException, DataServiceException, DataClientException {
String table = tableBaseName + dataFormat;
String mappingReference = dataFormat + "Mapping";
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net/", cluster), appId, appKey, authority);
@ -120,13 +119,16 @@ public class E2ETest {
try {
if (tableBaseName.startsWith(testPrefix)) {
engineClient.execute(database, String.format(".create table %s (ColA:string,ColB:int)", table));
if (streaming){
engineClient.execute(database, ".clear database cache streamingingestion schema");
}
}
engineClient.execute(database, String.format(".create table ['%s'] ingestion %s mapping '%s' " +
"'[" + mapping + "]'", table, dataFormat, mappingReference));
TopicPartition tp = new TopicPartition("testPartition" + dataFormat, 11);
ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://ingest-%s.kusto.windows.net", cluster), appId, appKey, authority);
IngestClient ingestClient = IngestClientFactory.createClient(csb);
IngestClient ingestClient = IngestClientFactory.createManagedStreamingIngestClient(csb,engineCsb);
IngestionProperties ingestionProperties = new IngestionProperties(database, table);
long fileThreshold = 100;
@ -137,21 +139,25 @@ public class E2ETest {
String kustoDmUrl = String.format("https://ingest-%s.kusto.windows.net", cluster);
String kustoEngineUrl = String.format("https://%s.kusto.windows.net", cluster);
String basepath = Paths.get(basePath, dataFormat).toString();
Map<String, String> settings = getKustoConfigs(kustoDmUrl, kustoEngineUrl, basepath, mappingReference, fileThreshold, flushInterval);
Map<String, String> settings = getKustoConfigs(
kustoDmUrl, kustoEngineUrl, basepath, mappingReference, fileThreshold,
flushInterval, tp, dataFormat, table, streaming);
KustoSinkConfig config = new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, config, isDlqEnabled, dlqTopicName, kafkaProducer);
writer.open();
KustoSinkTask kustoSinkTask = new KustoSinkTask();
kustoSinkTask.start(settings);
kustoSinkTask.open(new ArrayList<TopicPartition>(){ {add(tp);}});
List<SinkRecord> records = new ArrayList<>();
for (byte[] messageBytes : messagesBytes) {
records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, messageBytes, 10));
}
for (SinkRecord record : records) {
writer.writeRecord(record);
}
kustoSinkTask.put(records);
// Streaming result should show
int timeoutMs = streaming ? 0 : 60 * 6 * 1000;
validateExpectedResults(engineClient, 2, table);
validateExpectedResults(engineClient, 2, table, timeoutMs);
} catch (InterruptedException e) {
return false;
} finally {
@ -163,12 +169,11 @@ public class E2ETest {
return true;
}
private void validateExpectedResults(Client engineClient, Integer expectedNumberOfRows, String table) throws InterruptedException, DataClientException, DataServiceException {
private void validateExpectedResults(Client engineClient, Integer expectedNumberOfRows, String table, int timeoutMs) throws InterruptedException, DataClientException, DataServiceException {
String query = String.format("%s | count", table);
KustoResultSetTable res = engineClient.execute(database, query).getPrimaryResults();
res.next();
int timeoutMs = 60 * 6 * 1000;
int rowCount = res.getInt(0);
int timeElapsedMs = 0;
int sleepPeriodMs = 5 * 1000;
@ -185,11 +190,14 @@ public class E2ETest {
}
private Map<String, String> getKustoConfigs(String clusterUrl, String engineUrl, String basePath, String tableMapping,
long fileThreshold, long flushInterval) {
long fileThreshold, long flushInterval, TopicPartition topic, String format,
String table, boolean streaming) {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_INGEST_URL_CONF, clusterUrl);
settings.put(KustoSinkConfig.KUSTO_ENGINE_URL_CONF, engineUrl);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, tableMapping);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF,
String.format("[{'topic': '%s','db': '%s', 'table': '%s','format': '%s', 'mapping':'%s' %s}]",
topic.topic(), database, table, format, tableMapping, streaming ? ",'streaming':true" : ""));
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, appId);
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, appKey);
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, authority);

Просмотреть файл

@ -63,7 +63,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), IngestionProperties.DATA_FORMAT.valueOf(ingestionProps.getDataFormat()), BehaviorOnError.FAIL);
String msg = "Line number 1: This is a message from the other size";
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
fileWriter.initializeRecordWriter(record);
@ -92,7 +92,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), IngestionProperties.DATA_FORMAT.valueOf(ingestionProps.getDataFormat()), BehaviorOnError.FAIL);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
@ -133,7 +133,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), IngestionProperties.DATA_FORMAT.valueOf(ingestionProps.getDataFormat()), BehaviorOnError.FAIL);
String msg = "Message";
SinkRecord record = new SinkRecord("topic", 1, null, null, null, msg, 10);
@ -152,7 +152,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, new ReentrantReadWriteLock(), IngestionProperties.DATA_FORMAT.valueOf(ingestionProps.getDataFormat()), BehaviorOnError.FAIL);
String msg2 = "Second Message";
SinkRecord record1 = new SinkRecord("topic", 1, null, null, null, msg2, 10);
@ -201,7 +201,7 @@ public class FileWriterTest {
}
return Paths.get(path, Long.toString(offset)).toString();
};
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, reentrantReadWriteLock, ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, reentrantReadWriteLock, IngestionProperties.DATA_FORMAT.valueOf(ingestionProps.getDataFormat()), BehaviorOnError.FAIL);
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
@ -228,7 +228,7 @@ public class FileWriterTest {
fileWriter2.writeData(record3);
reentrantReadWriteLock.readLock().unlock();
Thread.sleep(510);
Thread.sleep(550);
// Assertions
Assertions.assertEquals(2, files.size());

Просмотреть файл

@ -24,7 +24,7 @@ public class KustoSinkConnectorConfigTest {
public void shouldHaveDefaultValues() {
// Adding required Configuration with no default value.
KustoSinkConfig config = new KustoSinkConfig(setupConfigs());
Assertions.assertNotNull(config.getKustoUrl());
Assertions.assertNotNull(config.getKustoIngestUrl());
Assertions.assertTrue(config.getFlushSizeBytes() > 0);
Assertions.assertTrue(config.getFlushInterval() > 0);
Assertions.assertFalse(config.isDlqEnabled());

Просмотреть файл

@ -116,7 +116,8 @@ public class KustoSinkTaskTest {
Assertions.assertEquals("csv", kustoSinkTaskSpy.getIngestionProps("topic1").ingestionProperties.getDataFormat());
Assertions.assertEquals("db2", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDatabaseName());
Assertions.assertEquals("table2", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName());
Assertions.assertEquals("json", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat());
Assertions.assertEquals("multijson",
kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat());
Assertions.assertEquals("Mapping", kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference());
Assertions.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
}