зеркало из
1
0
Форкнуть 0
Kafka sink for Kusto
Перейти к файлу
Ohad Bitton 9e3e35acf9 shouldnt hide appId 2020-06-15 22:16:19 +03:00
.github Add a Pull Request Template to te Repository 2020-05-13 20:58:13 +03:00
src shouldnt hide appId 2020-06-15 22:16:19 +03:00
.gitignore working version 2018-11-05 12:34:59 +02:00
LICENSE Initial commit 2018-09-02 03:09:38 -07:00
README.md Merge branch 'hashedin-error-handling' and use newer Java version which now has apacheAvro format support 2020-06-14 13:40:24 +03:00
connect-kusto-sink.properties better docs and logging 2018-11-06 12:07:04 +02:00
pom.xml Merge branch 'hashedin-error-handling' and use newer Java version which now has apacheAvro format support 2020-06-14 13:40:24 +03:00

README.md

Microsoft Azure Data Explorer (Kusto) Kafka Sink

This repository contains the source code of the Kafka To ADX Sink.

Setup

Clone

git clone git://github.com/Azure/kafka-sink-azure-kusto.git
cd ./kafka-sink-azure-kusto

Build

Need to build locally with Maven

Requirements

Building locally using Maven is simple:

mvn clean compile assembly:single

Which should produce a Jar complete with dependencies.

Deploy

Deployment as a Kafka plugin will be demonstrated using a docker image for convenience, but production deployment should be very similar (detailed docs can be found here)

Run Docker

docker run --rm -p 3030:3030 -p 9092:9092 -p 8081:8081 -p 8083:8083 -p 8082:8082 -p 2181:2181  -v C:\kafka-sink-azure-kusto\target\kafka-sink-azure-kusto-0.1.0-jar-with-dependencies.jar:/connectors/kafka-sink-azure-kusto-0.1.0-jar-with-dependencies.jar landoop/fast-data-dev 

Verify

connect to container and run: cat /var/log/broker.log /var/log/connect-distributed.log | grep -C 4 i kusto

Add plugin

Go to http://localhost:3030/kafka-connect-ui/#/cluster/fast-data-dev/ and using the UI add Kusto Sink (NEW button, then pick kusto from list) example configuration:

name=KustoSinkConnector 
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector 
key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 
tasks.max=1 
topics=testing1 
kusto.tables.topics_mapping=[{'topic': 'testing1','db': 'daniel', 'table': 'KafkaTest','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'daniel', 'table': 'KafkaTest','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'},] 
kusto.auth.authority=XXX 
kusto.url=https://ingest-mycluster.kusto.windows.net/ 
kusto.auth.appid=XXX 
kusto.auth.appkey=XXX 
kusto.sink.tempdir=/var/tmp/ 
kusto.sink.flush_size=1000
kusto.sink.flush_interval_ms=300000 

Aggregation in the sink is done using files, these are sent to kusto if the aggregated file has reached the flush_size (size is in bytes) or if the flush_interval_ms interval has passed. For the confluent parameters please refer here https://docs.confluent.io/2.0.0/connect/userguide.html#configuring-connectors For scaling you should consider making tasks.max equal to the number of pods and ports.

Create Table and Mapping

Very similar to (Event Hub)[https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-event-hub#create-a-target-table-in-azure-data-explorer]

Publish data

In container, you can run interactive cli producer like so:

/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic testing1

or just pipe file (which contains example data)

/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic testing1 < file.json

Query Data

Make sure no errors happened during ingestion

.show ingestion failures

See that newly ingested data becomes available for querying

KafkaTest | count

####Supported formats csv, json, avro, apacheAvro parquet, orc, tsv, scsv, sohsv, psv, txt.

Note - avro, apacheAvro, parquet and orc files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file. Use value.converter=org.apache.kafka.connect.converters.ByteArrayConverter.

####Supported compressions Kusto Kafka connector can get compressed data, this can be specified in the topics_mapping in the configuration under 'eventDataCompression', this can get all the compression types kusto accepts. Using this configuration files does'nt get aggregated in the connector and are sent straight for ingestion.

####Avro example One can use this gist FilesKafkaProducer to create a JAR file that can be used as a file producer which sends files as bytes to kafka. Create an avro file as in src\test\resources\data.avro copy the jar docker cp C:\Users\ohbitton\IdeaProjects\kafka-producer-test\target\kafka-producer-all.jar <container id>:/FilesKafkaProducer.jar Connect to the container docker exec -it <id> bash. Run from the container java -jar FilesKafkaProducer.jar fileName [topic] [times]

Need Support?

  • Have a feature request for SDKs? Please post it on User Voice to help us prioritize
  • Have a technical question? Ask on Stack Overflow with tag "azure-data-explorer"
  • Need Support? Every customer with an active Azure subscription has access to support with guaranteed response time. Consider submitting a ticket and get assistance from Microsoft support team
  • Found a bug? Please help us fix it by thoroughly documenting it and filing an issue.

Contribute

We gladly accept community contributions.

  • Issues: Please report bugs using the Issues section of GitHub
  • Forums: Interact with the development teams on StackOverflow or the Microsoft Azure Forums
  • Source Code Contributions: If you would like to become an active contributor to this project please follow the instructions provided in Contributing.md.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

For general suggestions about Microsoft Azure please use our UserVoice forum.