зеркало из
1
0
Форкнуть 0
This commit is contained in:
Ohad Bitton 2019-12-24 13:41:15 +02:00
Родитель f9168de831
Коммит e70fa6b78b
4 изменённых файлов: 10 добавлений и 6 удалений

Просмотреть файл

@ -88,8 +88,8 @@ KafkaTest | count
```
#Supported formats
csv, json, avro, parquet, tsv, scsv, sohsv, psv, txt.
> Note - avro and parquet files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file.
csv, json, avro, parquet, orc, tsv, scsv, sohsv, psv, txt.
> Note - avro, parquet and orc files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file.
Use value.converter=org.apache.kafka.connect.converters.ByteArrayConverter.
#Supported compressions

Просмотреть файл

@ -66,19 +66,19 @@
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>
<commonio.version>2.6</commonio.version>
<kusto-sdk.version>1.3.0</kusto-sdk.version>
<kusto-sdk.version>1.4.0</kusto-sdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
<version>1.3.0</version>
<version>${kusto-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-ingest</artifactId>
<version>1.3.0</version>
<version>${kusto-sdk.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>

Просмотреть файл

@ -109,8 +109,10 @@ public class KustoSinkTask extends SinkTask {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json);
} else if (format.equals(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.avro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())){
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.parquet);
} else if (format.equals(IngestionProperties.DATA_FORMAT.orc.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.orc);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv);
}
@ -255,5 +257,6 @@ public class KustoSinkTask extends SinkTask {
@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) throws ConnectException {
// do nothing , rolling files can handle writing
}
}

Просмотреть файл

@ -95,6 +95,7 @@ public class TopicPartitionWriter {
public void open() {
boolean flushImmediately = ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString())
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString())
|| ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.orc.toString())
|| this.eventDataCompression != null;
fileWriter = new FileWriter(