зеркало из
1
0
Форкнуть 0

Merge pull request #15 from anmolbansal144/format-support

Updated outstream implementation, misc.deadletterqueue config and removed eventDataCompression
This commit is contained in:
Fahad Sheikh 2020-07-16 14:30:48 +05:30 коммит произвёл GitHub
Родитель 77fce2ac4e ea1e0a5935
Коммит e405151122
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
21 изменённых файлов: 107 добавлений и 181 удалений

Просмотреть файл

@ -60,7 +60,7 @@ value.converter=org.apache.kafka.connect.storage.StringConverter
tasks.max=1
topics=testing1,testing2
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping'}]
kusto.url=https://ingest-mycluster.kusto.windows.net/
@ -74,8 +74,13 @@ flush.interval.ms=300000
behavior.on.error=FAIL
dlq.bootstrap.servers=localhost:9092
dlq.topic.name=test-topic-error
misc.deadletterqueue.bootstrap.servers=localhost:9092
misc.deadletterqueue.topic.name=test-topic-error
errors.tolerance=all
errors.deadletterqueue.topic.name=connect-dlq-topic
errors.deadletterqueue.topic.replication.factor=1
errors.deadletterqueue.context.headers.enable=true
errors.retry.max.time.ms=60000
errors.retry.backoff.time.ms=5000
@ -112,17 +117,14 @@ KafkaTest | count
#### Supported formats
`csv`, `json`, `avro`, `apacheAvro`, `parquet`, `orc`, `tsv`, `scsv`, `sohsv`, `psv`, `txt`.
`csv`, `json`, `avro`, `apacheAvro`, `tsv`, `scsv`, `sohsv`, `psv`, `txt`.
> Note - `avro`, `apacheAvro`, `parquet` and `orc` files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file.
> Note - `avro` and `apacheAvro`files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file.
>
>Use `value.converter=org.apache.kafka.connect.converters.ByteArrayConverter`
#### Supported compressions
Kusto Kafka connector can get compressed data, this can be specified in the topics_mapping in the configuration under
`eventDataCompression`, this can get all the compression types kusto accepts. Using this configuration, files don't get aggregated in the connector and are sent straight for ingestion.
#### Supported compressions
All the records processed by the Connector(except for records having schema as bytearray) are `gzip` compressed after flushing them into a file before ingesting it into Kusto.
#### Avro example
One can use this gist [FilesKafkaProducer]("https://gist.github.com/ohadbitt/8475dc9f63df1c0d0bc322e9b00fdd00") to create

Просмотреть файл

@ -5,7 +5,7 @@ tasks.max=1
#kusto.url=https://ingest-{cluster}.kusto.windows.net/
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping'}]
#aad.auth.appid=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
#aad.auth.appkey=ZZZZZZZZZZZZZZZZZZZZZZZ
@ -17,8 +17,13 @@ tasks.max=1
#behavior.on.error=FAIL
#dlq.bootstrap.servers=localhost:9092
#dlq.topic.name=test-topic-error
#misc.deadletterqueue.bootstrap.servers=localhost:9092
#misc.deadletterqueue.topic.name=test-topic-error
#errors.tolerance=all
#errors.deadletterqueue.topic.name=connect-dlq-topic
#errors.deadletterqueue.topic.replication.factor=1
#errors.deadletterqueue.context.headers.enable=true
#errors.retry.max.time.ms=60000
#errors.retry.backoff.time.ms=5000

Просмотреть файл

@ -99,11 +99,13 @@
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
@ -148,12 +150,13 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>5.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-runtime</artifactId>
<version>5.3.0-ccs</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
</dependencies>
<repositories>

Просмотреть файл

@ -44,7 +44,6 @@ public class FileWriter implements Closeable {
private Timer timer;
private Consumer<SourceFile> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Function<Long, String> getFilePath;
private OutputStream outputStream;
private String basePath;
@ -66,7 +65,6 @@ public class FileWriter implements Closeable {
* @param fileThreshold - Max size, uncompressed bytes.
* @param onRollCallback - Callback to allow code to execute when rolling a file. Blocking code.
* @param getFilePath - Allow external resolving of file name.
* @param shouldCompressData - Should the FileWriter compress the incoming data.
* @param behaviorOnError - Either log, fail or ignore errors based on the mode.
*/
public FileWriter(String basePath,
@ -74,7 +72,6 @@ public class FileWriter implements Closeable {
Consumer<SourceFile> onRollCallback,
Function<Long, String> getFilePath,
long flushInterval,
boolean shouldCompressData,
ReentrantReadWriteLock reentrantLock,
IngestionProperties ingestionProps,
BehaviorOnError behaviorOnError) {
@ -83,7 +80,6 @@ public class FileWriter implements Closeable {
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
this.flushInterval = flushInterval;
this.shouldCompressData = shouldCompressData;
this.behaviorOnError = behaviorOnError;
// This is a fair lock so that we flush close to the time intervals
@ -114,13 +110,11 @@ public class FileWriter implements Closeable {
FileOutputStream fos = new FileOutputStream(file);
currentFileDescriptor = fos.getFD();
fos.getChannel().truncate(0);
countingStream = new CountingOutputStream(fos);
fileProps.file = file;
currentFile = fileProps;
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, outputStream);
countingStream = new CountingOutputStream(new GZIPOutputStream(fos));
outputStream = countingStream.getOutputStream();
recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, countingStream);
}
void rotate(@Nullable Long offset) throws IOException, DataException {
@ -131,12 +125,8 @@ public class FileWriter implements Closeable {
void finishFile(Boolean delete) throws IOException, DataException {
if(isDirty()){
recordWriter.commit();
if(shouldCompressData){
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();
} else {
outputStream.flush();
}
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();
try {
onRollCallback.accept(currentFile);
} catch (ConnectException e) {
@ -168,7 +158,7 @@ public class FileWriter implements Closeable {
}
private void dumpFile() throws IOException {
outputStream.close();
countingStream.close();
currentFileDescriptor = null;
boolean deleted = currentFile.file.delete();
if (!deleted) {
@ -178,8 +168,8 @@ public class FileWriter implements Closeable {
}
public void rollback() throws IOException {
if (outputStream != null) {
outputStream.close();
if (countingStream != null) {
countingStream.close();
if (currentFile != null && currentFile.file != null) {
dumpFile();
}
@ -254,9 +244,9 @@ public class FileWriter implements Closeable {
resetFlushTimer(true);
}
recordWriter.write(record);
recordWriter.commit();
currentFile.records.add(record);
currentFile.rawBytes = recordWriter.getDataSize();
currentFile.zippedBytes += countingStream.numBytes;
currentFile.rawBytes = countingStream.numBytes;
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold || shouldWriteAvroAsBytes) {
rotate(record.kafkaOffset());
@ -294,9 +284,11 @@ public class FileWriter implements Closeable {
private class CountingOutputStream extends FilterOutputStream {
private long numBytes = 0;
private OutputStream outputStream;
CountingOutputStream(OutputStream out) {
super(out);
this.outputStream = out;
}
@Override
@ -316,6 +308,11 @@ public class FileWriter implements Closeable {
out.write(b, off, len);
this.numBytes += len;
}
public OutputStream getOutputStream() {
return outputStream;
}
}
}

Просмотреть файл

@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit;
public class KustoSinkConfig extends AbstractConfig {
private static final Logger log = LoggerFactory.getLogger(KustoSinkConfig.class);
private static final String DLQ_PROPS_PREFIX = "dlq.";
private static final String DLQ_PROPS_PREFIX = "misc.deadletterqueue.";
enum BehaviorOnError {
FAIL, LOG, IGNORE;
@ -90,16 +90,16 @@ public class KustoSinkConfig extends AbstractConfig {
+ "while processing records or ingesting records in Kusto table, available in connect logs.";
private static final String KUSTO_BEHAVIOR_ON_ERROR_DISPLAY = "Behavior On Error";
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "dlq.bootstrap.servers";
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "misc.deadletterqueue.bootstrap.servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this list to Kafka broker's address(es) "
+ "to which the Connector should write failed records to. "
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster. "
+ "This list should be in the form host-1:port-1,host-2:port-2,…host-n:port-n. ";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Dead-Letter Queue Bootstrap Servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Miscellaneous Dead-Letter Queue Bootstrap Servers";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "dlq.topic.name";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "misc.deadletterqueue.topic.name";
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to the Kafka topic's name "
+ "to which the failed records are to be sinked.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Dead-Letter Queue Topic Name";
+ "to which the Connector should write records failed due to network interruptions or unavailability of Kusto cluster.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Miscellaneous Dead-Letter Queue Topic Name";
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time upto which the Connector "
@ -333,7 +333,8 @@ public class KustoSinkConfig extends AbstractConfig {
} else if (getDlqBootstrapServers().isEmpty() && Strings.isNullOrEmpty(getDlqTopicName())) {
return false;
} else {
throw new ConfigException("To enable DLQ configuration please configure both `dlq.bootstrap.servers` and `dlq.topic.name` configurations ");
throw new ConfigException("To enable Miscellaneous Dead-Letter Queue configuration please configure both " +
"`misc.deadletterqueue.bootstrap.servers` and `misc.deadletterqueue.topic.name` configurations ");
}
}

Просмотреть файл

@ -10,7 +10,6 @@ import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -137,8 +136,7 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString("table");
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
IngestionProperties props = new IngestionProperties(db, table);
if (format != null && !format.isEmpty()) {
@ -171,7 +169,6 @@ public class KustoSinkTask extends SinkTask {
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.eventDataCompression = compressionType;
topicIngestionProperties.ingestionProperties = props;
result.put(mapping.getString("topic"), topicIngestionProperties);
}
@ -193,9 +190,13 @@ public class KustoSinkTask extends SinkTask {
Client engineClient = createKustoEngineClient(config);
if (config.getTopicToTableMapping() != null) {
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
if(mappings.length() > 0) {
if(isIngestorRole(mappings.getJSONObject(0), engineClient)) {
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
}
}
}
}
String tableAccessErrorMessage = "";
@ -221,6 +222,20 @@ public class KustoSinkTask extends SinkTask {
}
}
private boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException {
String database = testMapping.getString("db");
String table = testMapping.getString("table");
try {
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table));
} catch(DataServiceException | DataClientException err){
if(err.getCause().getMessage().contains("Forbidden:")){
log.warn("User might have ingestor privileges, table validation will be skipped for all table mappings ");
return false;
}
}
return true;
}
/**
* This function validates whether the user has the read and write access to the intended table
* before starting to sink records into ADX.
@ -232,8 +247,8 @@ public class KustoSinkTask extends SinkTask {
String database = mapping.getString("db");
String table = mapping.getString("table");
String format = mapping.optString("format");
String mappingName = mapping.optString("mapping");
String format = mapping.getString("format");
String mappingName = mapping.getString("mapping");
boolean hasAccess = false;
try {
try {
@ -243,7 +258,6 @@ public class KustoSinkTask extends SinkTask {
}
} catch (DataServiceException e) {
hasAccess = false;
databaseTableErrorList.add(String.format("Database:%s Table:%s | table not found", database, table));
}
if(hasAccess) {
@ -326,11 +340,11 @@ public class KustoSinkTask extends SinkTask {
isDlqEnabled = true;
dlqTopicName = config.getDlqTopicName();
Properties properties = config.getDlqProps();
log.info("Initializing DLQ producer with the following properties: {}", properties.keySet());
log.info("Initializing miscellaneous dead-letter queue producer with the following properties: {}", properties.keySet());
try {
kafkaProducer = new KafkaProducer<>(properties);
} catch (Exception e) {
throw new ConnectException("Failed to initialize producer for dlq", e);
throw new ConnectException("Failed to initialize producer for miscellaneous dead-letter queue", e);
}
} else {

Просмотреть файл

@ -8,7 +8,6 @@ import org.apache.kafka.connect.sink.SinkRecord;
public class SourceFile {
long rawBytes = 0;
long zippedBytes = 0;
long numRecords = 0;
public String path;
public File file;

Просмотреть файл

@ -1,10 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
class TopicIngestionProperties {
IngestionProperties ingestionProperties;
CompressionType eventDataCompression = null;
}

Просмотреть файл

@ -4,21 +4,13 @@ import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.AvroRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.ByteRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.JsonRecordWriterProvider;
import com.microsoft.azure.kusto.kafka.connect.sink.formatWriter.StringRecordWriterProvider;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -29,16 +21,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
private static final String COMPRESSION_EXTENSION = ".gz";
private final CompressionType eventDataCompression;
private final TopicPartition tp;
private final IngestClient client;
private final IngestionProperties ingestionProps;
@ -57,7 +47,7 @@ class TopicPartitionWriter {
private final BehaviorOnError behaviorOnError;
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> kafkaProducer)
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer)
{
this.tp = tp;
this.client = client;
@ -66,14 +56,13 @@ class TopicPartitionWriter {
this.basePath = config.getTempDirPath();
this.flushInterval = config.getFlushInterval();
this.currentOffset = 0;
this.eventDataCompression = ingestionProps.eventDataCompression;
this.reentrantReadWriteLock = new ReentrantReadWriteLock(true);
this.maxRetryAttempts = config.getMaxRetryAttempts() + 1;
this.retryBackOffTime = config.getRetryBackOffTimeMs();
this.behaviorOnError = config.getBehaviorOnError();
this.isDlqEnabled = isDlqEnabled;
this.dlqTopicName = dlqTopicName;
this.kafkaProducer = kafkaProducer;
this.kafkaProducer = dlqProducer;
}
@ -116,14 +105,14 @@ class TopicPartitionWriter {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException interruptedErr) {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName);
log.warn("Writing {} failed records to miscellaneous dead-letter queue topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e);
}
} else {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName);
log.warn("Writing {} failed records to miscellaneous dead-letter queue topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e);
@ -142,12 +131,12 @@ class TopicPartitionWriter {
kafkaProducer.send(dlqRecord, (recordMetadata, exception) -> {
if (exception != null) {
throw new KafkaException(
String.format("Failed to write records to DLQ topic=%s.", dlqTopicName),
String.format("Failed to write records to miscellaneous dead-letter queue topic=%s.", dlqTopicName),
exception);
}
});
} catch (IllegalStateException e) {
log.error("Failed to write records to DLQ topic, "
log.error("Failed to write records to miscellaneous dead-letter queue topic, "
+ "kafka producer has already been closed. Exception={}", e);
}
}
@ -157,16 +146,7 @@ class TopicPartitionWriter {
offset = offset == null ? currentOffset : offset;
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;
String compressionExtension = "";
if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) {
if(eventDataCompression != null) {
compressionExtension = "." + eventDataCompression.toString();
} else {
compressionExtension = ".gz";
}
}
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), COMPRESSION_EXTENSION)).toString();
}
void writeRecord(SinkRecord record) throws ConnectException {
@ -198,15 +178,12 @@ class TopicPartitionWriter {
void open() {
// Should compress binary files
boolean shouldCompressData = shouldCompressData(this.ingestionProps, this.eventDataCompression);
fileWriter = new FileWriter(
basePath,
fileThreshold,
this::handleRollFile,
this::getFilePath,
flushInterval,
shouldCompressData,
reentrantReadWriteLock,
ingestionProps,
behaviorOnError);
@ -228,7 +205,4 @@ class TopicPartitionWriter {
}
}
static boolean shouldCompressData(IngestionProperties ingestionProps, CompressionType eventDataCompression) {
return (eventDataCompression == null);
}
}

Просмотреть файл

@ -23,10 +23,4 @@ public interface RecordWriter extends Closeable {
* close the writer.
*/
void commit();
/**
*
* @return Total length of file
*/
long getDataSize();
}

Просмотреть файл

@ -25,7 +25,6 @@ public class AvroRecordWriterProvider implements RecordWriterProvider {
return new RecordWriter() {
final DataFileWriter<Object> writer = new DataFileWriter<>(new GenericDatumWriter<>());
Schema schema;
long size =0;
@Override
public void write(SinkRecord record) throws IOException {
@ -43,7 +42,6 @@ public class AvroRecordWriterProvider implements RecordWriterProvider {
log.trace("Sink record: {}", record);
Object value = avroData.fromConnectData(schema, record.value());
size += value.toString().getBytes().length;
// AvroData wraps primitive types so their schema can be included. We need to unwrap
// NonRecordContainers to just their value to properly handle these types
if (value instanceof NonRecordContainer) {
@ -70,11 +68,6 @@ public class AvroRecordWriterProvider implements RecordWriterProvider {
throw new DataException(e);
}
}
@Override
public long getDataSize() {
return size;
}
};
}
}

Просмотреть файл

@ -17,19 +17,22 @@ public class ByteRecordWriterProvider implements RecordWriterProvider {
@Override
public RecordWriter getRecordWriter(String filename, OutputStream out) {
return new RecordWriter() {
long size =0;
@Override
public void write(SinkRecord record) throws IOException {
byte[] value = null;
byte[] valueBytes = (byte[]) record.value();
byte[] separator = "\n".getBytes(StandardCharsets.UTF_8);
byte[] valueWithSeparator = new byte[valueBytes.length + separator.length];
System.arraycopy(valueBytes, 0, valueWithSeparator, 0, valueBytes.length);
System.arraycopy(separator, 0, valueWithSeparator, valueBytes.length, separator.length);
value = valueWithSeparator;
if (filename.contains("avro")) {
value = new byte[valueBytes.length];
System.arraycopy(valueBytes, 0, value, 0, valueBytes.length);
} else {
byte[] separator = "\n".getBytes(StandardCharsets.UTF_8);
byte[] valueWithSeparator = new byte[valueBytes.length + separator.length];
System.arraycopy(valueBytes, 0, valueWithSeparator, 0, valueBytes.length);
System.arraycopy(separator, 0, valueWithSeparator, valueBytes.length, separator.length);
value = valueWithSeparator;
}
out.write(value);
size += value.length;
}
@Override
@ -49,11 +52,6 @@ public class ByteRecordWriterProvider implements RecordWriterProvider {
throw new DataException(e);
}
}
@Override
public long getDataSize() {
return size;
}
};
}
}

Просмотреть файл

@ -23,8 +23,6 @@ public class JsonRecordWriterProvider implements RecordWriterProvider {
private static final String LINE_SEPARATOR = System.lineSeparator();
private static final byte[] LINE_SEPARATOR_BYTES
= LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8);
private static final long LINE_SEPARATOR_BYTES_LENGTH
= LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8).length;
private final ObjectMapper mapper = new ObjectMapper();
private final JsonConverter converter = new JsonConverter();
@ -44,7 +42,6 @@ public class JsonRecordWriterProvider implements RecordWriterProvider {
final JsonGenerator writer = mapper.getFactory()
.createGenerator(out)
.setRootValueSeparator(null);
long size =0;
@Override
public void write(SinkRecord record) {
log.trace("Sink record: {}", record);
@ -62,7 +59,6 @@ public class JsonRecordWriterProvider implements RecordWriterProvider {
writer.writeObject(value);
writer.writeRaw(LINE_SEPARATOR);
}
size+= (value.toString().getBytes().length + LINE_SEPARATOR_BYTES_LENGTH);
} catch (IOException e) {
throw new ConnectException(e);
}
@ -77,11 +73,6 @@ public class JsonRecordWriterProvider implements RecordWriterProvider {
}
}
@Override
public long getDataSize() {
return size;
}
@Override
public void close() {
try {

Просмотреть файл

@ -16,14 +16,12 @@ public class StringRecordWriterProvider implements RecordWriterProvider {
@Override
public RecordWriter getRecordWriter(String filename, OutputStream out) {
return new RecordWriter() {
long size =0;
@Override
public void write(SinkRecord record) throws IOException {
byte[] value = null;
value = String.format("%s\n", record.value()).getBytes(StandardCharsets.UTF_8);
out.write(value);
size += value.length;
}
@Override
@ -43,11 +41,6 @@ public class StringRecordWriterProvider implements RecordWriterProvider {
throw new DataException(e);
}
}
@Override
public long getDataSize() {
return size;
}
};
}

Просмотреть файл

@ -77,7 +77,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
String msg = "Line number 1: This is a message from the other size";
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
fileWriter.initializeRecordWriter(record);
@ -108,7 +108,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
@ -149,7 +149,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
String msg = "Message";
SinkRecord record = new SinkRecord("topic", 1, null, null, null, msg, 10);
@ -168,7 +168,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
String msg2 = "Second Message";
SinkRecord record1 = new SinkRecord("topic", 1, null, null, null, msg2, 10);
@ -217,7 +217,7 @@ public class FileWriterTest {
}
return Paths.get(path, Long.toString(offset)).toString();
};
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock, ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, reentrantReadWriteLock, ingestionProps, BehaviorOnError.FAIL);
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
@ -259,37 +259,6 @@ public class FileWriterTest {
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
@Test
public void testFileWriterCompressed() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
HashMap<String, Long> files = new HashMap<>();
final int MAX_FILE_SIZE = 128 * 2;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";
Consumer<SourceFile> trackFiles = getAssertFileConsumer(msg);
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, byteArrayOutputStream.toByteArray(), 10);
fileWriter.writeData(record);
fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
}
static Function<SourceFile, String> getAssertFileConsumerFunction(String msg) {
return (SourceFile f) -> {
try (FileInputStream fileInputStream = new FileInputStream(f.file)) {

Просмотреть файл

@ -112,8 +112,8 @@ public class KustoSinkConnectorConfigTest {
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put("dlq.security.protocol", "SASL_PLAINTEXT");
settings.put("dlq.sasl.mechanism", "PLAIN");
settings.put("misc.deadletterqueue.security.protocol", "SASL_PLAINTEXT");
settings.put("misc.deadletterqueue.sasl.mechanism", "PLAIN");
config = new KustoSinkConfig(settings);

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -128,7 +127,7 @@ public class KustoSinkTaskTest {
public void getTable() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv', 'eventDataCompression':'gz'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
@ -146,7 +145,6 @@ public class KustoSinkTaskTest {
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").eventDataCompression, CompressionType.gz);
Assert.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
}
}

Просмотреть файл

@ -38,14 +38,13 @@ public class AvroRecordWriterTest {
File file = new File("abc.avro");
AvroRecordWriterProvider writer = new AvroRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
OutputStream out=fos;
OutputStream out = fos;
RecordWriter rd = writer.getRecordWriter(file.getPath(),out);
for(SinkRecord record : records){
rd.write(record);
}
rd.commit();
validate(file.getPath());
assertEquals(rd.getDataSize(),290);
file.delete();
}

Просмотреть файл

@ -27,7 +27,7 @@ public class ByteArrayWriterProviderTest {
File file = new File("abc.bin");
ByteRecordWriterProvider writer = new ByteRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
OutputStream out=fos;
OutputStream out = fos;
RecordWriter rd = writer.getRecordWriter(file.getPath(), out);
for(SinkRecord record : records){
rd.write(record);
@ -40,7 +40,6 @@ public class ByteArrayWriterProviderTest {
assertEquals(st, String.format("hello-%s", i));
i++;
}
assertEquals(rd.getDataSize(),80);
file.delete();
}
}

Просмотреть файл

@ -30,7 +30,7 @@ public class JsonRecordWriterProviderTest {
File file = new File("abc.json");
JsonRecordWriterProvider jsonWriter = new JsonRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
OutputStream out=fos;
OutputStream out = fos;
RecordWriter rd = jsonWriter.getRecordWriter(file.getPath(), out);
for(SinkRecord record : records){
rd.write(record);
@ -43,7 +43,6 @@ public class JsonRecordWriterProviderTest {
assertEquals(st, String.format("{\"hello\":%s}", i));
i++;
}
assertEquals(rd.getDataSize(),100);
file.delete();
}
}

Просмотреть файл

@ -4,6 +4,7 @@ import com.microsoft.azure.kusto.kafka.connect.sink.format.RecordWriter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.FileOutputStream;
@ -26,7 +27,7 @@ public class StringRecordWriterProviderTest {
File file = new File("abc.txt");
StringRecordWriterProvider writer = new StringRecordWriterProvider();
FileOutputStream fos = new FileOutputStream(file);
OutputStream out=fos;
OutputStream out = fos;
RecordWriter rd = writer.getRecordWriter(file.getPath(), out);
for(SinkRecord record : records){
rd.write(record);
@ -39,7 +40,6 @@ public class StringRecordWriterProviderTest {
assertEquals(st, String.format("hello-%s", i));
i++;
}
assertEquals(rd.getDataSize(),80);
file.delete();
}