зеркало из
1
0
Форкнуть 0
This commit is contained in:
SanchayGupta1197 2020-07-14 18:25:41 +05:30
Родитель c990e58405
Коммит 2d53ba3542
9 изменённых файлов: 53 добавлений и 85 удалений

Просмотреть файл

@ -60,7 +60,7 @@ value.converter=org.apache.kafka.connect.storage.StringConverter
tasks.max=1
topics=testing1,testing2
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping'}]
kusto.url=https://ingest-mycluster.kusto.windows.net/
@ -119,10 +119,6 @@ KafkaTest | count
>Use `value.converter=org.apache.kafka.connect.converters.ByteArrayConverter`
#### Supported compressions
Kusto Kafka connector can get compressed data, this can be specified in the topics_mapping in the configuration under
`eventDataCompression`, this can get all the compression types kusto accepts. Using this configuration, files don't get aggregated in the connector and are sent straight for ingestion.
#### Avro example
One can use this gist [FilesKafkaProducer]("https://gist.github.com/ohadbitt/8475dc9f63df1c0d0bc322e9b00fdd00") to create

Просмотреть файл

@ -5,7 +5,7 @@ tasks.max=1
#kusto.url=https://ingest-{cluster}.kusto.windows.net/
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}]
#kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping'}]
#aad.auth.appid=XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
#aad.auth.appkey=ZZZZZZZZZZZZZZZZZZZZZZZ

Просмотреть файл

@ -44,7 +44,6 @@ public class FileWriter implements Closeable {
private Timer timer;
private Consumer<SourceFile> onRollCallback;
private final long flushInterval;
private final boolean shouldCompressData;
private Function<Long, String> getFilePath;
private OutputStream outputStream;
private String basePath;
@ -66,7 +65,6 @@ public class FileWriter implements Closeable {
* @param fileThreshold - Max size, uncompressed bytes.
* @param onRollCallback - Callback to allow code to execute when rolling a file. Blocking code.
* @param getFilePath - Allow external resolving of file name.
* @param shouldCompressData - Should the FileWriter compress the incoming data.
* @param behaviorOnError - Either log, fail or ignore errors based on the mode.
*/
public FileWriter(String basePath,
@ -74,7 +72,6 @@ public class FileWriter implements Closeable {
Consumer<SourceFile> onRollCallback,
Function<Long, String> getFilePath,
long flushInterval,
boolean shouldCompressData,
ReentrantReadWriteLock reentrantLock,
IngestionProperties ingestionProps,
BehaviorOnError behaviorOnError) {
@ -83,7 +80,6 @@ public class FileWriter implements Closeable {
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
this.flushInterval = flushInterval;
this.shouldCompressData = shouldCompressData;
this.behaviorOnError = behaviorOnError;
// This is a fair lock so that we flush close to the time intervals
@ -118,8 +114,7 @@ public class FileWriter implements Closeable {
countingStream = new CountingOutputStream(fos);
fileProps.file = file;
currentFile = fileProps;
outputStream = shouldCompressData ? new GZIPOutputStream(countingStream) : countingStream;
outputStream = new GZIPOutputStream(countingStream);
recordWriter = recordWriterProvider.getRecordWriter(currentFile.path, outputStream);
}
@ -131,12 +126,8 @@ public class FileWriter implements Closeable {
void finishFile(Boolean delete) throws IOException, DataException {
if(isDirty()){
recordWriter.commit();
if(shouldCompressData){
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();
} else {
outputStream.flush();
}
GZIPOutputStream gzip = (GZIPOutputStream) outputStream;
gzip.finish();
try {
onRollCallback.accept(currentFile);
} catch (ConnectException e) {

Просмотреть файл

@ -10,7 +10,6 @@ import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionMapping;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -137,8 +136,7 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString("table");
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
IngestionProperties props = new IngestionProperties(db, table);
if (format != null && !format.isEmpty()) {
@ -171,7 +169,6 @@ public class KustoSinkTask extends SinkTask {
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.eventDataCompression = compressionType;
topicIngestionProperties.ingestionProperties = props;
result.put(mapping.getString("topic"), topicIngestionProperties);
}

Просмотреть файл

@ -1,10 +1,8 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
class TopicIngestionProperties {
IngestionProperties ingestionProperties;
CompressionType eventDataCompression = null;
}

Просмотреть файл

@ -4,7 +4,6 @@ import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionClientException;
import com.microsoft.azure.kusto.ingest.exceptions.IngestionServiceException;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConfig.BehaviorOnError;
@ -38,7 +37,6 @@ class TopicPartitionWriter {
private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
private final CompressionType eventDataCompression;
private final TopicPartition tp;
private final IngestClient client;
private final IngestionProperties ingestionProps;
@ -66,7 +64,6 @@ class TopicPartitionWriter {
this.basePath = config.getTempDirPath();
this.flushInterval = config.getFlushInterval();
this.currentOffset = 0;
this.eventDataCompression = ingestionProps.eventDataCompression;
this.reentrantReadWriteLock = new ReentrantReadWriteLock(true);
this.maxRetryAttempts = config.getMaxRetryAttempts() + 1;
this.retryBackOffTime = config.getRetryBackOffTimeMs();
@ -157,14 +154,7 @@ class TopicPartitionWriter {
offset = offset == null ? currentOffset : offset;
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;
String compressionExtension = "";
if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) {
if(eventDataCompression != null) {
compressionExtension = "." + eventDataCompression.toString();
} else {
compressionExtension = ".gz";
}
}
String compressionExtension = ".gz";
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
}
@ -198,15 +188,12 @@ class TopicPartitionWriter {
void open() {
// Should compress binary files
boolean shouldCompressData = shouldCompressData(this.ingestionProps, this.eventDataCompression);
fileWriter = new FileWriter(
basePath,
fileThreshold,
this::handleRollFile,
this::getFilePath,
flushInterval,
shouldCompressData,
reentrantReadWriteLock,
ingestionProps,
behaviorOnError);
@ -228,7 +215,4 @@ class TopicPartitionWriter {
}
}
static boolean shouldCompressData(IngestionProperties ingestionProps, CompressionType eventDataCompression) {
return (eventDataCompression == null);
}
}

Просмотреть файл

@ -36,11 +36,11 @@ import java.util.logging.Logger;
public class E2ETest {
private static final String testPrefix = "tmpKafkaE2ETest";
private String appId = System.getProperty("appId");
private String appKey = System.getProperty("appKey");
private String authority = System.getProperty("authority");
private String cluster = System.getProperty("cluster");
private String database = System.getProperty("database");
private String appId = System.getProperty("appId","e299f8c0-4965-4b1f-9b55-a56dd4b8f6c4");
private String appKey = System.getProperty("appKey",".1t-fP8o~a0b8X7-o0.wCr6.0aPBR7~k9L");
private String authority = System.getProperty("authority","7f66d0ea-6137-4e37-a835-4530eba9b3ee");
private String cluster = System.getProperty("cluster","azureconnector.centralus");
private String database = System.getProperty("database","sanchay");
private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
private Logger log = Logger.getLogger(this.getClass().getName());
@ -117,7 +117,7 @@ public class E2ETest {
}
@Test
@Ignore
// @Ignore
public void testE2EAvro() throws URISyntaxException, DataClientException, DataServiceException {
String table = tableBaseName + "avro";
ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority);

Просмотреть файл

@ -77,7 +77,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> FILE_PATH;
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
String msg = "Line number 1: This is a message from the other size";
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, msg.getBytes(), 10);
fileWriter.initializeRecordWriter(record);
@ -108,7 +108,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, String.valueOf(java.util.UUID.randomUUID())).toString() + "csv.gz";
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
for (int i = 0; i < 9; i++) {
String msg = String.format("Line number %d : This is a message from the other size", i);
@ -149,7 +149,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + "csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 30000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
String msg = "Message";
SinkRecord record = new SinkRecord("topic", 1, null, null, null, msg, 10);
@ -168,7 +168,7 @@ public class FileWriterTest {
Function<Long, String> generateFileName2 = (Long l) -> Paths.get(path2, java.util.UUID.randomUUID().toString()).toString();
// Expect one file to be ingested as flushInterval had changed
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter2 = new FileWriter(path2, MAX_FILE_SIZE, trackFiles, generateFileName2, 1000, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
String msg2 = "Second Message";
SinkRecord record1 = new SinkRecord("topic", 1, null, null, null, msg2, 10);
@ -217,7 +217,7 @@ public class FileWriterTest {
}
return Paths.get(path, Long.toString(offset)).toString();
};
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, false, reentrantReadWriteLock, ingestionProps, BehaviorOnError.FAIL);
FileWriter fileWriter2 = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 500, reentrantReadWriteLock, ingestionProps, BehaviorOnError.FAIL);
String msg2 = "Second Message";
reentrantReadWriteLock.readLock().lock();
long recordOffset = 1;
@ -258,37 +258,41 @@ public class FileWriterTest {
fileWriter2.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 0);
}
/*
Test commented as it required shouldCompressData as false which is actually true while running
the connector as we are compression all files as gzip.
*/
@Test
public void testFileWriterCompressed() throws IOException {
String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
File folder = new File(path);
boolean mkdirs = folder.mkdirs();
Assert.assertTrue(mkdirs);
HashMap<String, Long> files = new HashMap<>();
final int MAX_FILE_SIZE = 128 * 2;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
String msg = "Message";
Consumer<SourceFile> trackFiles = getAssertFileConsumer(msg);
Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
// Expect no files to be ingested as size is small and flushInterval is big
FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
gzipOutputStream.write(msg.getBytes());
gzipOutputStream.finish();
SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, byteArrayOutputStream.toByteArray(), 10);
fileWriter.writeData(record);
fileWriter.close();
Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
}
// @Test
// public void testFileWriterCompressed() throws IOException {
// String path = Paths.get(currentDirectory.getPath(), "testGzipFileWriter2").toString();
//
// File folder = new File(path);
// boolean mkdirs = folder.mkdirs();
// Assert.assertTrue(mkdirs);
// HashMap<String, Long> files = new HashMap<>();
//
// final int MAX_FILE_SIZE = 128 * 2;
//
// ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
// GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
// String msg = "Message";
//
// Consumer<SourceFile> trackFiles = getAssertFileConsumer(msg);
//
// Function<Long, String> generateFileName = (Long l) -> Paths.get(path, java.util.UUID.randomUUID().toString()).toString() + ".csv.gz";
//
// // Expect no files to be ingested as size is small and flushInterval is big
// //FileWriter fileWriter = new FileWriter(path, MAX_FILE_SIZE, trackFiles, generateFileName, 0, false, new ReentrantReadWriteLock(), ingestionProps, BehaviorOnError.FAIL);
//
// gzipOutputStream.write(msg.getBytes());
// gzipOutputStream.finish();
// SinkRecord record = new SinkRecord("topic", 1, null, null, Schema.BYTES_SCHEMA, byteArrayOutputStream.toByteArray(), 10);
// fileWriter.writeData(record);
//
// fileWriter.close();
// // Assert.assertEquals(Objects.requireNonNull(folder.listFiles()).length, 1);
// }
static Function<SourceFile, String> getAssertFileConsumerFunction(String msg) {
return (SourceFile f) -> {

Просмотреть файл

@ -1,6 +1,5 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
@ -128,7 +127,7 @@ public class KustoSinkTaskTest {
public void getTable() {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv', 'eventDataCompression':'gz'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db2', 'table': 'table2','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
props.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
props.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
@ -146,7 +145,6 @@ public class KustoSinkTaskTest {
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getTableName(), "table2");
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getDataFormat(), "json");
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic2").ingestionProperties.getIngestionMapping().getIngestionMappingReference(), "Mapping");
Assert.assertEquals(kustoSinkTaskSpy.getIngestionProps("topic1").eventDataCompression, CompressionType.gz);
Assert.assertNull(kustoSinkTaskSpy.getIngestionProps("topic3"));
}
}