зеркало из
1
0
Форкнуть 0
This commit is contained in:
SanchayGupta1197 2020-07-16 13:04:18 +05:30
Родитель 38c9ed6865
Коммит 476da15f15
4 изменённых файлов: 16 добавлений и 15 удалений

Просмотреть файл

@ -77,6 +77,11 @@ behavior.on.error=FAIL
misc.deadletterqueue.bootstrap.servers=localhost:9092
misc.deadletterqueue.topic.name=test-topic-error
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq-topic
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true
errors.retry.max.time.ms=60000
errors.retry.backoff.time.ms=5000
````
@ -118,7 +123,8 @@ KafkaTest | count
>
>Use `value.converter=org.apache.kafka.connect.converters.ByteArrayConverter`
#### Supported compressions
All the records processed by the Connector(except for records having schema as bytearray) are `gzip` compressed after flushing them into a file before ingesting it into Kusto.
#### Avro example
One can use this gist [FilesKafkaProducer]("https://gist.github.com/ohadbitt/8475dc9f63df1c0d0bc322e9b00fdd00") to create

Просмотреть файл

@ -20,6 +20,11 @@ tasks.max=1
#misc.deadletterqueue.bootstrap.servers=localhost:9092
#misc.deadletterqueue.topic.name=test-topic-error
#errors.tolerance=all
#errors.deadletterqueue.topic.name=connect-dlq-topic
#errors.deadletterqueue.topic.replication.factor=1
#errors.deadletterqueue.context.headers.enable=true
#errors.retry.max.time.ms=60000
#errors.retry.backoff.time.ms=5000

Просмотреть файл

@ -92,13 +92,13 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "misc.deadletterqueue.bootstrap.servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this list to Kafka broker's address(es) "
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster to. "
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster. "
+ "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n. ";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Miscellaneous Dead-Letter Queue Bootstrap Servers";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "misc.deadletterqueue.topic.name";
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to the Kafka topic's name "
+ "to which the records failed due to network interruptions or unavailability of Kusto cluster are to be sinked.";
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Miscellaneous Dead-Letter Queue Topic Name";
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms";

Просмотреть файл

@ -7,17 +7,10 @@ import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.AvroRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.ByteRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.JsonRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.StringRecordWriterProvider;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -28,14 +21,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
private static final String COMPRESSION_EXTENSION = ".gz";
private final TopicPartition tp;
private final IngestClient client;
@ -154,9 +146,7 @@ class TopicPartitionWriter {
offset = offset == null ? currentOffset : offset;
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;
String compressionExtension = ".gz";
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), COMPRESSION_EXTENSION)).toString();
}
void writeRecord(SinkRecord record) throws ConnectException {