From 476da15f15bde26f8cb35ca7a8f81fdb81928815 Mon Sep 17 00:00:00 2001 From: SanchayGupta1197 Date: Thu, 16 Jul 2020 13:04:18 +0530 Subject: [PATCH] resolve PR comments --- README.md | 8 +++++++- connect-kusto-sink.properties | 5 +++++ .../kusto/kafka/connect/sink/KustoSinkConfig.java | 4 ++-- .../kafka/connect/sink/TopicPartitionWriter.java | 14 ++------------ 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 88f41bd..35fa5f1 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,11 @@ behavior.on.error=FAIL misc.deadletterqueue.bootstrap.servers=localhost:9092 misc.deadletterqueue.topic.name=test-topic-error +errors.tolerance=all +errors.deadletterqueue.topic.name=connect-dlq-topic +errors.deadletterqueue.topic.replication.factor=1 +errors.deadletterqueue.context.headers.enable=true + errors.retry.max.time.ms=60000 errors.retry.backoff.time.ms=5000 ```` @@ -118,7 +123,8 @@ KafkaTest | count > >Use `value.converter=org.apache.kafka.connect.converters.ByteArrayConverter` - +#### Supported compressions +All the records processed by the Connector(except for records having schema as bytearray) are `gzip` compressed after flushing them into a file before ingesting it into Kusto. #### Avro example One can use this gist [FilesKafkaProducer]("https://gist.github.com/ohadbitt/8475dc9f63df1c0d0bc322e9b00fdd00") to create diff --git a/connect-kusto-sink.properties b/connect-kusto-sink.properties index 00e77f5..f216c0e 100644 --- a/connect-kusto-sink.properties +++ b/connect-kusto-sink.properties @@ -20,6 +20,11 @@ tasks.max=1 #misc.deadletterqueue.bootstrap.servers=localhost:9092 #misc.deadletterqueue.topic.name=test-topic-error +#errors.tolerance=all +#errors.deadletterqueue.topic.name=connect-dlq-topic +#errors.deadletterqueue.topic.replication.factor=1 +#errors.deadletterqueue.context.headers.enable=true + #errors.retry.max.time.ms=60000 #errors.retry.backoff.time.ms=5000 diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java index 4a6b9b4..3069b30 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkConfig.java @@ -92,13 +92,13 @@ public class KustoSinkConfig extends AbstractConfig { static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "misc.deadletterqueue.bootstrap.servers"; private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this list to Kafka broker's address(es) " - + "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster to. " + + "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster. " + "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n. "; private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Miscellaneous Dead-Letter Queue Bootstrap Servers"; static final String KUSTO_DLQ_TOPIC_NAME_CONF = "misc.deadletterqueue.topic.name"; private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to the Kafka topic's name " - + "to which the records failed due to network interruptions or unavailability of Kusto cluster are to be sinked."; + + "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster."; private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Miscellaneous Dead-Letter Queue Topic Name"; static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms"; diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java index ba06cff..86c8491 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java @@ -7,17 +7,10 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError; -import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider; -import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.AvroRecordWriterProvider; -import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.ByteRecordWriterProvider; -import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.JsonRecordWriterProvider; -import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.StringRecordWriterProvider; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; @@ -28,14 +21,13 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Paths; -import java.util.Map; -import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; class TopicPartitionWriter { private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class); + private static final String COMPRESSION_EXTENSION = ".gz"; private final TopicPartition tp; private final IngestClient client; @@ -154,9 +146,7 @@ class TopicPartitionWriter { offset = offset == null ? currentOffset : offset; long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset; - String compressionExtension = ".gz"; - - return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString(); + return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), COMPRESSION_EXTENSION)).toString(); } void writeRecord(SinkRecord record) throws ConnectException {