added retries and dlq for failed records

This commit is contained in:
fahad 2020-06-15 01:49:16 +05:30
Родитель 96dc07bea8
Коммит 4d6474c66a
7 изменённых файлов: 397 добавлений и 155 удалений

Просмотреть файл

@ -47,21 +47,40 @@ Go to `http://localhost:3030/kafka-connect-ui/#/cluster/fast-data-dev/` and usin
example configuration:
```config
name=KustoSinkConnector
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
tasks.max=1
topics=testing1
kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'daniel', 'table': 'KafkaTest','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'},]
kusto.auth.authority=XXX
topics=test-topic
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'daniel', 'table': 'KafkaTest','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'},]
kusto.url=https://ingest-mycluster.kusto.windows.net/
kusto.auth.appid=XXX
kusto.auth.appkey=XXX
kusto.auth.username
kusto.auth.password
aad.auth.appid
aad.auth.appkey
aad.auth.authority
kusto.sink.tempdir=/var/tmp/
kusto.sink.flush_size=1000
kusto.sink.flush_interval_ms=300000
```
flush.size.bytes=1000
flush.interval.ms=300000
error.tolerance=NONE
dlq.bootstrap.servers=localhost:9092
dlq.topic.name=test-topic-error
max.retry.time.ms=60000
retry.backoff.time.ms=5000
````
Aggregation in the sink is done using files, these are sent to kusto if the aggregated file has reached the flush_size
(size is in bytes) or if the flush_interval_ms interval has passed.
For the confluent parameters please refer here https://docs.confluent.io/2.0.0/connect/userguide.html#configuring-connectors

Просмотреть файл

@ -1,13 +1,35 @@
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
#kusto.sink.flush_interval_ms=1000 * 60
tasks.max=1
#topics=topic1,topic2
#kusto.tables.topics_mapping=[{'topic': 'topic1','db': 'db_name', 'table': 'table_name','format': 'json', 'mapping':'Mapping'},{'topic': 'topic2','db': 'db_name', 'table': 'table_name2'}]
#kusto.auth.authority=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
#kusto.url=https://ingest-{cluster}.kusto.windows.net/
#kusto.auth.appid=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
#kusto.auth.appkey=ZZZZZZZZZZZZZZZZZZZZZZZ
#kusto.tables.topics.mapping=[{'topic': 'topic1','db': 'db_name', 'table': 'table_name','format': 'json', 'mapping':'Mapping'},{'topic': 'topic2','db': 'db_name', 'table': 'table_name2'}]
#kusto.auth.username=user1
#kusto.auth.password=xxxxxxxx
#aad.auth.appid=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
#aad.auth.appkey=ZZZZZZZZZZZZZZZZZZZZZZZ
#aad.auth.authority=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
#kusto.sink.tempdir=/var/tmp/
#kusto.sink.flush_size=1000
#flush.size.bytes=1000
#flush.interval.ms=1000 * 60
#error.tolerance=NONE
#dlq.bootstrap.servers=localhost:9092
#dlq.topic.name=test-topic-error
#max.retry.time.ms=60000
#retry.backoff.time.ms=5000
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
####Deprecated Configurations####
#kusto.tables.topics_mapping=[{'topic': 'topic1','db': 'db_name', 'table': 'table_name','format': 'json', 'mapping':'Mapping'},{'topic': 'topic2','db': 'db_name', 'table': 'table_name2'}]
#kusto.sink.flush_size=1000
#kusto.sink.flush_interval_ms=1000 * 60

Просмотреть файл

@ -2,6 +2,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Function;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,6 +17,7 @@ import java.io.OutputStream;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.zip.GZIPOutputStream;
/**
@ -28,7 +30,7 @@ public class FileWriter implements Closeable {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
SourceFile currentFile;
private Timer timer;
private Function<SourceFile, String> onRollCallback;
private Consumer<SourceFile> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Function<Long, String> getFilePath;
@ -51,7 +53,7 @@ public class FileWriter implements Closeable {
*/
public FileWriter(String basePath,
long fileThreshold,
Function<SourceFile, String> onRollCallback,
Consumer<SourceFile> onRollCallback,
Function<Long, String> getFilePath,
long flushInterval,
boolean shouldCompressData,
@ -75,25 +77,26 @@ public class FileWriter implements Closeable {
return this.currentFile != null && this.currentFile.rawBytes > 0;
}
public synchronized void write(byte[] data, @Nullable Long offset) throws IOException {
public synchronized void write(byte[] data, @Nullable SinkRecord record) throws IOException {
if (flushError != null) {
throw new ConnectException(flushError);
}
if (data == null || data.length == 0) return;
if (currentFile == null) {
openFile(offset);
openFile(record.kafkaOffset());
resetFlushTimer(true);
}
outputStream.write(data);
currentFile.records.add(record);
currentFile.rawBytes += data.length;
currentFile.zippedBytes += countingStream.numBytes;
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold) {
rotate(offset);
rotate(record.kafkaOffset());
resetFlushTimer(true);
}
}
@ -136,10 +139,7 @@ public class FileWriter implements Closeable {
} else {
outputStream.flush();
}
String err = onRollCallback.apply(currentFile);
if(err != null){
throw new ConnectException(err);
}
onRollCallback.accept(currentFile);
if (delete){
dumpFile();
}

Просмотреть файл

@ -1,6 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@ -8,12 +9,29 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.testng.util.Strings;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KustoSinkConfig extends AbstractConfig {
enum ErrorTolerance {
ALL, NONE;
/**
* Gets names of available error tolerance mode.
* @return array of available tolerance mode names
*/
public static String[] getNames() {
return Arrays
.stream(ErrorTolerance.class.getEnumConstants())
.map(Enum::name)
.toArray(String[]::new);
}
}
// TODO: this might need to be per kusto cluster...
static final String KUSTO_URL_CONF = "kusto.url";
private static final String KUSTO_URL_DOC = "Kusto ingestion service URI.";
@ -61,10 +79,39 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_COMMIT_IMMEDIATLY_DOC = "Whether kafka call to commit offsets will flush and commit the last offsets or only the ingested ones\"";
private static final String KUSTO_COMMIT_IMMEDIATLY_DISPLAY = "kusto.sink.commit";
static final String KUSTO_ERROR_TOLERANCE_CONF = "error.tolerance";
private static final String KUSTO_ERROR_TOLERANCE_DOC = "Error tolerance setting. "
+ "Must be configured to one of the following:\n"
+ "``NONE``\n"
+ "The Connector throws ConnectException and stops processing records "
+ "when an error occurs while processing or ingesting records into KustoDB.\n"
+ "``ALL``\n"
+ "Continues to process next subsequent records "
+ "when error occurs while processing or ingesting records into KustoDB.\n";
private static final String KUSTO_ERROR_TOLERANCE_DISPLAY = "Error Tolerance";
static final String KUSTO_RETRIES_COUNT_CONF = "kusto.sink.retries";
private static final String KUSTO_RETRIES_COUNT_DOC = "Number of retries on ingestions before throwing";
private static final String KUSTO_RETRIES_COUNT_DISPLAY = "kusto.sink.retries";
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "dlq.bootstrap.servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this to Kafka broker's address(es) "
+ "to which the Connector should write failed records to.";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Dead-Letter Queue Bootstrap Servers";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "dlq.topic.name";
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to Kafka topic's name "
+ "to which the failed records are to be sinked. "
+ "By default, the Connector will write failed records to {$origin-topic}-failed. "
+ "The Connector will create the topic if not already present with replication factor as 1. "
+ "To configure this to a desirable value, create topic from CLI prior to running the Connector.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Dead-Letter Queue Topic Name";
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "max.retry.time.ms";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time upto which the Connector "
+ "should retry writing records to KustoDB in case of failures.";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY = "Maximum Retry Time";
static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF = "retry.backoff.time.ms";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DOC = "BackOff time between retry attempts "
+ "the Connector makes to ingest records into KustoDB.";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY = "Retry BackOff Time";
// Deprecated configs
static final String KUSTO_TABLES_MAPPING_CONF_DEPRECATED = "kusto.tables.topics_mapping";
@ -83,74 +130,83 @@ public class KustoSinkConfig extends AbstractConfig {
public static ConfigDef getConfig() {
final String connectionGroupName = "Connection";
final String writeGroupName = "Writes";
int connectionGroupOrder = 0;
int writeGroupOrder = 0;
//TODO: Add display name, validators, recommenders to configs.
return new ConfigDef()
final String connectionGroupName = "Connection";
final String writeGroupName = "Writes";
final String errorAndRetriesGroupName = "Error Handling and Retries";
int connectionGroupOrder = 0;
int writeGroupOrder = 0;
int errorAndRetriesGroupOrder = 0;
//TODO: Add display name, validators, recommenders to configs.
ConfigDef result = new ConfigDef();
defineConnectionConfigs(connectionGroupName, connectionGroupOrder, result);
defineWriteConfigs(writeGroupName, writeGroupOrder, result);
defineErrorHandlingAndRetriesConfgis(errorAndRetriesGroupName, errorAndRetriesGroupOrder, result);
return result;
}
private static void defineErrorHandlingAndRetriesConfgis(final String errorAndRetriesGroupName,
int errorAndRetriesGroupOrder, ConfigDef result) {
result
.define(
KUSTO_URL_CONF,
KUSTO_ERROR_TOLERANCE_CONF,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
Importance.HIGH,
KUSTO_URL_DOC,
connectionGroupName,
connectionGroupOrder++,
ErrorTolerance.NONE.name(),
ConfigDef.ValidString.in(ErrorTolerance.NONE.name(), ErrorTolerance.ALL.name()),
Importance.LOW,
KUSTO_ERROR_TOLERANCE_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_URL_DISPLAY)
KUSTO_ERROR_TOLERANCE_DISPLAY)
.define(
KUSTO_AUTH_USERNAME_CONF,
KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF,
Type.STRING,
null,
Importance.HIGH,
KUSTO_AUTH_USERNAME_DOC,
connectionGroupName,
connectionGroupOrder++,
Importance.LOW,
KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_USERNAME_DISPLAY)
KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY)
.define(
KUSTO_AUTH_PASSWORD_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_PASSWORD_DOC,
connectionGroupName,
connectionGroupOrder++,
KUSTO_DLQ_TOPIC_NAME_CONF,
Type.STRING,
"kusto-dead-letter-queue",
Importance.LOW,
KUSTO_DLQ_TOPIC_NAME_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_PASSWORD_DISPLAY)
KUSTO_DLQ_TOPIC_NAME_DISPLAY)
.define(
KUSTO_AUTH_APPKEY_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_APPKEY_DOC,
connectionGroupName,
connectionGroupOrder++,
KUSTO_SINK_MAX_RETRY_TIME_MS_CONF,
Type.LONG,
TimeUnit.SECONDS.toMillis(300),
Importance.LOW,
KUSTO_SINK_MAX_RETRY_TIME_MS_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_APPKEY_DISPLAY)
KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY)
.define(
KUSTO_AUTH_APPID_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_APPID_DOC,
connectionGroupName,
connectionGroupOrder++,
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF,
Type.LONG,
TimeUnit.SECONDS.toMillis(10),
Importance.LOW,
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_APPID_DISPLAY)
.define(
KUSTO_AUTH_AUTHORITY_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_AUTHORITY_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_AUTHORITY_DISPLAY)
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY);
}
private static void defineWriteConfigs(final String writeGroupName, int writeGroupOrder,
ConfigDef result) {
result
.define(
KUSTO_TABLES_MAPPING_CONF,
Type.STRING,
@ -234,17 +290,72 @@ public class KustoSinkConfig extends AbstractConfig {
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_COMMIT_IMMEDIATLY_DISPLAY)
KUSTO_COMMIT_IMMEDIATLY_DISPLAY);
}
private static void defineConnectionConfigs(final String connectionGroupName,
int connectionGroupOrder, ConfigDef result) {
result
.define(
KUSTO_RETRIES_COUNT_CONF,
Type.INT,
2,
Importance.LOW,
KUSTO_RETRIES_COUNT_DOC,
writeGroupName,
writeGroupOrder++,
KUSTO_URL_CONF,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
Importance.HIGH,
KUSTO_URL_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_RETRIES_COUNT_DISPLAY);
KUSTO_URL_DISPLAY)
.define(
KUSTO_AUTH_USERNAME_CONF,
Type.STRING,
null,
Importance.HIGH,
KUSTO_AUTH_USERNAME_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_USERNAME_DISPLAY)
.define(
KUSTO_AUTH_PASSWORD_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_PASSWORD_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_PASSWORD_DISPLAY)
.define(
KUSTO_AUTH_APPKEY_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_APPKEY_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_APPKEY_DISPLAY)
.define(
KUSTO_AUTH_APPID_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_APPID_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_APPID_DISPLAY)
.define(
KUSTO_AUTH_AUTHORITY_CONF,
Type.PASSWORD,
"",
Importance.HIGH,
KUSTO_AUTH_AUTHORITY_DOC,
connectionGroupName,
connectionGroupOrder++,
Width.MEDIUM,
KUSTO_AUTH_AUTHORITY_DISPLAY);
}
public String getKustoUrl() {
@ -300,10 +411,28 @@ public class KustoSinkConfig extends AbstractConfig {
return this.getBoolean(KUSTO_COMMIT_IMMEDIATLY_CONF);
}
public int getKustoRetriesCount() {
return this.getInt(KUSTO_RETRIES_COUNT_CONF);
public ErrorTolerance getErrorTolerance() {
return ErrorTolerance.valueOf(
this.getString(KUSTO_ERROR_TOLERANCE_CONF).toUpperCase());
}
public String getDlqBootstrapServers() {
return this.getString(KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF);
}
public String getDlqTopicName() {
return this.getString(KUSTO_DLQ_TOPIC_NAME_CONF);
}
public long getMaxRetryAttempts() {
return this.getLong(KUSTO_SINK_MAX_RETRY_TIME_MS_CONF)
/ this.getLong(KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF);
}
public long getRetryBackOffTimeMs() {
return this.getLong(KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF);
}
public static void main(String[] args) {
System.out.println(getConfig().toEnrichedRst());
}

Просмотреть файл

@ -53,11 +53,12 @@ public class KustoSinkTask extends SinkTask {
private final Set<TopicPartition> assignment;
private Map<String, TopicIngestionProperties> topicsToIngestionProps;
private long maxFileSize;
private long flushInterval;
private String tempDir;
//private long maxFileSize;
//private long flushInterval;
//private String tempDir;
private boolean commitImmediately;
private int retiresCount;
private KustoSinkConfig config;
IngestClient kustoIngestClient;
Map<TopicPartition, TopicPartitionWriter> writers;
@ -290,7 +291,8 @@ public class KustoSinkTask extends SinkTask {
if (ingestionProps == null) {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped for the topic: %s. please check your configuration.", tp.topic()));
} else {
TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient, ingestionProps, tempDir, maxFileSize, flushInterval, commitImmediately, retiresCount);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, kustoIngestClient,
ingestionProps, commitImmediately, config);
writer.open();
writers.put(tp, writer);
@ -307,7 +309,7 @@ public class KustoSinkTask extends SinkTask {
writers.remove(tp);
assignment.remove(tp);
} catch (ConnectException e) {
log.error("Error closing writer for {}. Error: {}", tp, e.getMessage());
log.error("Error closing writer for {}. Error: {}", tp, e);
}
}
}
@ -316,7 +318,7 @@ public class KustoSinkTask extends SinkTask {
@Override
public void start(Map<String, String> props) {
KustoSinkConfig config = new KustoSinkConfig(props);
config = new KustoSinkConfig(props);
String url = config.getKustoUrl();
validateTableMappings(config);
@ -326,11 +328,7 @@ public class KustoSinkTask extends SinkTask {
// this should be read properly from settings
kustoIngestClient = createKustoIngestClient(config);
tempDir = config.getTempDirPath();
maxFileSize = config.getFlushSizeBytes();
flushInterval = config.getFlushInterval();
commitImmediately = config.getKustoCommitImmediatly();
retiresCount = config.getKustoRetriesCount();
log.info(String.format("Started KustoSinkTask with target cluster: (%s), source topics: (%s)",
url, topicsToIngestionProps.keySet().toString()));
@ -341,7 +339,7 @@ public class KustoSinkTask extends SinkTask {
}
@Override
public void stop() throws ConnectException {
public void stop() {
log.warn("Stopping KustoSinkTask");
for (TopicPartitionWriter writer : writers.values()) {
writer.close();
@ -356,7 +354,7 @@ public class KustoSinkTask extends SinkTask {
}
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
public void put(Collection<SinkRecord> records) {
log.debug("put '"+ records.size() + "' num of records");
int i = 0;
SinkRecord lastRecord = null;
@ -422,7 +420,7 @@ public class KustoSinkTask extends SinkTask {
}
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
// do nothing , rolling files can handle writing
}
}

Просмотреть файл

@ -1,6 +1,10 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
public class SourceFile {
long rawBytes = 0;
@ -8,4 +12,5 @@ public class SourceFile {
long numRecords = 0;
public String path;
public File file;
public List<SinkRecord> records = new ArrayList<>();
}

Просмотреть файл

@ -2,9 +2,14 @@ package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
@ -12,15 +17,20 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.util.Strings;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
private final CompressionType eventDataCompression;
private final TopicPartition tp;
private final IngestClient client;
@ -32,58 +42,104 @@ class TopicPartitionWriter {
FileWriter fileWriter;
long currentOffset;
Long lastCommittedOffset;
private int defaultRetriesCount;
private int currentRetries;
private ReentrantReadWriteLock reentrantReadWriteLock;
private final long maxRetryAttempts;
private final long retryBackOffTime;
private final boolean isDlqEnabled;
private final String dlqTopicName;
private final Producer<byte[], byte[]> kafkaProducer;
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps, String basePath,
long fileThreshold, long flushInterval, boolean commitImmediatly, int retriesCount) {
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
boolean commitImmediatly, KustoSinkConfig config)
{
this.tp = tp;
this.client = client;
this.ingestionProps = ingestionProps.ingestionProperties;
this.fileThreshold = fileThreshold;
this.basePath = basePath;
this.flushInterval = flushInterval;
this.fileThreshold = config.getFlushSizeBytes();
this.basePath = config.getTempDirPath();
this.flushInterval = config.getFlushInterval();
this.commitImmediately = commitImmediatly;
this.currentOffset = 0;
this.eventDataCompression = ingestionProps.eventDataCompression;
this.defaultRetriesCount = retriesCount;
this.currentRetries = retriesCount;
this.reentrantReadWriteLock = new ReentrantReadWriteLock(true);
this.maxRetryAttempts = config.getMaxRetryAttempts() + 1;
this.retryBackOffTime = config.getRetryBackOffTimeMs();
if (Strings.isNotNullAndNotEmpty(config.getDlqBootstrapServers())) {
isDlqEnabled = true;
dlqTopicName = config.getDlqTopicName();
Properties properties = new Properties();
properties.put("bootstrap.servers", config.getDlqBootstrapServers());
properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
kafkaProducer = new KafkaProducer<>(properties);
} else {
kafkaProducer = null;
isDlqEnabled = false;
dlqTopicName = null;
}
}
String handleRollFile(SourceFile fileDescriptor) {
public void handleRollFile(SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);
new ArrayList<>();
try {
client.ingestFromFile(fileSourceInfo, ingestionProps);
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
this.lastCommittedOffset = currentOffset;
currentRetries = defaultRetriesCount;
} catch (Exception e) {
log.error("Ingestion Failed for file : "+ fileDescriptor.file.getName() + ", message: " + e.getMessage() + "\nException : " + ExceptionUtils.getStackTrace(e));
if (commitImmediately) {
if (currentRetries > 0) {
try {
// Default time for commit is 5 seconds timeout.
Thread.sleep(1500);
} catch (InterruptedException e1) {
log.error("Couldn't sleep !");
}
log.error("Ingestion Failed for file : " + fileDescriptor.file.getName() + ", defaultRetriesCount left '" + defaultRetriesCount + "'. message: " + e.getMessage() + "\nException : " + ExceptionUtils.getStackTrace(e));
currentRetries--;
return handleRollFile(fileDescriptor);
} else {
currentRetries = defaultRetriesCount;
// Returning string will make the caller throw
return "Ingestion Failed for file : " + fileDescriptor.file.getName() + ", defaultRetriesCount left '" + defaultRetriesCount + "'. message: " + e.getMessage() + "\nException : " + ExceptionUtils.getStackTrace(e);
}
for (int retryAttempts = 0; true; retryAttempts++) {
try {
client.ingestFromFile(fileSourceInfo, ingestionProps);
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
this.lastCommittedOffset = currentOffset;
} catch (IngestionClientException e) {
//retrying transient exceptions
backOffForRemainingAttempts(retryAttempts, e, fileDescriptor);
} catch (IngestionServiceException e) {
// non-retriable and non-transient exceptions
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
throw new ConnectException("Unable to ingest reccords into KustoDB", e);
}
}
}
return null;
private void backOffForRemainingAttempts(int retryAttempts, IngestionClientException e, SourceFile fileDescriptor) {
if (retryAttempts < maxRetryAttempts) {
// RetryUtil can be deleted if exponential backOff is not required, currently using constant backOff.
// long sleepTimeMs = RetryUtil.computeExponentialBackOffWithJitter(retryAttempts, TimeUnit.SECONDS.toMillis(5));
long sleepTimeMs = retryBackOffTime;
log.error("Failed to ingest records into KustoDB, backing off and retrying ingesting records after {} milliseconds.", sleepTimeMs);
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException interruptedErr) {
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e);
}
} else {
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e);
}
}
public void sendFailedRecordToDlq(SinkRecord record) {
byte[] recordKey = String.format("Failed to write record to KustoDB with the following kafka coordinates, "
+ "topic=%s, partition=%s, offset=%s.",
record.topic(),
record.kafkaPartition(),
record.kafkaOffset()).getBytes(StandardCharsets.UTF_8);
byte[] recordValue = record.value().toString().getBytes(StandardCharsets.UTF_8);
if (isDlqEnabled) {
ProducerRecord<byte[], byte[]> dlqRecord = new ProducerRecord<>(dlqTopicName, recordKey, recordValue);
try {
kafkaProducer.send(dlqRecord, (recordMetadata, exception) -> {
if (exception != null) {
log.error("Failed to write records to DLQ topic={}, exception={}",
dlqTopicName, exception);
}
});
} catch (IllegalStateException e) {
log.error("Failed to write records to DLQ topic, "
+ "kafka producer has already been closed. Exception={}", e);
}
}
}
String getFilePath(@Nullable Long offset) {
@ -119,7 +175,8 @@ class TopicPartitionWriter {
value = valueWithSeparator;
} else {
log.error(String.format("Unexpected value type, skipping record %s", record));
// TODO: add behavior on error logic
log.error(String.format("Unexpected value type, skipping record %s", record));
}
if (value == null) {
this.currentOffset = record.kafkaOffset();
@ -128,15 +185,14 @@ class TopicPartitionWriter {
reentrantReadWriteLock.readLock().lock();
// Current offset is saved after flushing for the flush timer to use
fileWriter.write(value, record.kafkaOffset());
fileWriter.write(value, record);
this.currentOffset = record.kafkaOffset();
} catch (ConnectException ex) {
if (commitImmediately) {
throw ex;
}
handleErrors(ex);
} catch (IOException ex) {
if (commitImmediately) {
throw new ConnectException("Got an IOExcption while writing to file with message:" + ex.getMessage());
throw new ConnectException("Exception while writing record "
+ "to the file to be ingested into KustoDB", ex);
}
} finally {
reentrantReadWriteLock.readLock().unlock();
@ -144,6 +200,14 @@ class TopicPartitionWriter {
}
}
private void handleErrors(ConnectException ex) {
if (commitImmediately) {
throw ex;
}
}
void open() {
// Should compress binary files
boolean shouldCompressData = shouldCompressData(this.ingestionProps, this.eventDataCompression);
@ -163,7 +227,12 @@ class TopicPartitionWriter {
fileWriter.rollback();
// fileWriter.close(); TODO ?
} catch (IOException e) {
e.printStackTrace();
log.error("Failed to rollback with exception={}", e);
}
try {
kafkaProducer.close();
} catch (Exception e) {
log.error("Failed to close kafka producer={}", e);
}
}