зеркало из
1
0
Форкнуть 0

Docs and versions for connector and java client

This commit is contained in:
Ohad Bitton 2019-12-19 16:53:00 +02:00
Родитель 4cf4b7617c
Коммит ba6c6789d9
3 изменённых файлов: 31 добавлений и 6 удалений

Просмотреть файл

@ -54,7 +54,7 @@ key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
tasks.max=1
topics=testing1
kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'Mapping'}]
kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'daniel', 'table': 'KafkaTest','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'},]
kusto.auth.authority=XXX
kusto.url=https://ingest-mycluster.kusto.windows.net/
kusto.auth.appid=XXX
@ -89,9 +89,23 @@ KafkaTest | count
#Supported formats
csv, json, avro, parquet, tsv, scsv, sohsv, psv, txt.
> Note - avro and parquet files are sent each record separately without aggregation, and are expected to be sent as a byte array containing the full file.
> Note - avro and parquet files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file.
Use value.converter=org.apache.kafka.connect.converters.ByteArrayConverter.
#Supported compressions
Kusto Kafka connector can get compressed data, this can be specified in the topics_mapping in the configuration under
'eventDataCompression', this can get all the compression types kusto accepts. Using this configuration files does'nt get
aggregated in the connector and are sent straight for ingestion.
#Avro example
Props
One can use this gist [FilesKafkaProducer]("https://gist.github.com/ohadbitt/8475dc9f63df1c0d0bc322e9b00fdd00") to create
a JAR file that can be used as a file producer which sends files as bytes to kafka.
Create an avro file as in `src\test\resources\data.avro`
copy the jar `docker cp C:\Users\ohbitton\IdeaProjects\kafka-producer-test\target\kafka-producer-all.jar <container id>:/FilesKafkaProducer.jar`
Connect to the container `docker exec -it <id> bash`.
Run from the container `java -jar FilesKafkaProducer.jar fileName [topic] [times]`
## Need Support?
- **Have a feature request for SDKs?** Please post it on [User Voice](https://feedback.azure.com/forums/915733-azure-data-explorer) to help us prioritize
- **Have a technical question?** Ask on [Stack Overflow with tag "azure-data-explorer"](https://stackoverflow.com/questions/tagged/azure-data-explorer)

Просмотреть файл

@ -8,7 +8,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<version>0.1.0</version>
<version>0.2.0</version>
<build>
<plugins>
<plugin>
@ -66,18 +66,19 @@
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>
<commonio.version>2.6</commonio.version>
<kusto-sdk.version>1.3.0</kusto-sdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-data</artifactId>
<version>1.1.0</version>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure.kusto</groupId>
<artifactId>kusto-ingest</artifactId>
<version>1.1.0</version>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>

Просмотреть файл

@ -19,6 +19,7 @@ import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/**
@ -169,7 +170,11 @@ public class KustoSinkTask extends SinkTask {
log.error("Error closing writer for {}. Error: {}", tp, e.getMessage());
}
}
try {
kustoIngestClient.close();
} catch (IOException e) {
log.error("Error closing kusto client", e);
}
writers.clear();
assignment.clear();
}
@ -202,6 +207,11 @@ public class KustoSinkTask extends SinkTask {
for (TopicPartitionWriter writer : writers.values()) {
writer.close();
}
try {
kustoIngestClient.close();
} catch (IOException e) {
log.error("Error closing kusto client", e);
}
}
@Override