зеркало из
1
0
Форкнуть 0
Kafka sink for Kusto
Перейти к файлу
hasher 354b4514c5 change log level of Avro and Json RecordWriter 2020-06-29 18:52:38 +05:30
.github Add a Pull Request Template to te Repository 2020-05-13 20:58:13 +03:00
src change log level of Avro and Json RecordWriter 2020-06-29 18:52:38 +05:30
.gitignore working version 2018-11-05 12:34:59 +02:00
LICENSE Initial commit 2018-09-02 03:09:38 -07:00
README.md removed auth.user, auth.pass support | changed aad configs to mandatory 2020-06-22 21:49:19 +05:30
connect-kusto-sink.properties remove deprecated configs | add tmp-dir 2020-06-25 20:37:04 +05:30
pom.xml clean the code and remove unwanted changes 2020-06-22 21:19:34 +05:30

README.md

Microsoft Azure Data Explorer (Kusto) Kafka Sink

This repository contains the source code of the Kafka To ADX Sink.

Setup

Clone

git clone git://github.com/Azure/kafka-sink-azure-kusto.git
cd ./kafka-sink-azure-kusto

Build

Need to build locally with Maven

Requirements

Building locally using Maven is simple:

mvn clean compile assembly:single

Which should produce a Jar complete with dependencies.

Deploy

Deployment as a Kafka plugin will be demonstrated using a docker image for convenience, but production deployment should be very similar (detailed docs can be found here)

Run Docker

docker run --rm -p 3030:3030 -p 9092:9092 -p 8081:8081 -p 8083:8083 -p 8082:8082 -p 2181:2181  -v C:\kafka-sink-azure-kusto\target\kafka-sink-azure-kusto-0.1.0-jar-with-dependencies.jar:/connectors/kafka-sink-azure-kusto-0.1.0-jar-with-dependencies.jar landoop/fast-data-dev 

Verify

Connect to container and run:

cat /var/log/broker.log /var/log/connect-distributed.log | grep -C 4 i kusto

Add plugin

Go to http://localhost:3030/kafka-connect-ui/#/cluster/fast-data-dev/ and using the UI add Kusto Sink (NEW button, then pick kusto from list) example configuration:


name=KustoSinkConnector 
connector.class=com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector 

key.converter=org.apache.kafka.connect.storage.StringConverter 
value.converter=org.apache.kafka.connect.storage.StringConverter 

tasks.max=1 
topics=testing1,testing2

kusto.tables.topics.mapping=[{'topic': 'testing1','db': 'test_db', 'table': 'test_table_1','format': 'json', 'mapping':'JsonMapping'},{'topic': 'testing2','db': 'test_db', 'table': 'test_table_2','format': 'csv', 'mapping':'CsvMapping', 'eventDataCompression':'gz'}] 

kusto.url=https://ingest-mycluster.kusto.windows.net/ 

aad.auth.appid
aad.auth.appkey
aad.auth.authority

kusto.sink.tempdir=/var/tmp/ 
flush.size.bytes=1000
flush.interval.ms=300000

behavior.on.error=FAIL

dlq.bootstrap.servers=localhost:9092
dlq.topic.name=test-topic-error

errors.retry.max.time.ms=60000
errors.retry.backoff.time.ms=5000

Aggregation in the sink is done using files, these are sent to kusto if the aggregated file has reached the flush_size (size is in bytes) or if the flush_interval_ms interval has passed. For the confluent parameters please refer here https://docs.confluent.io/2.0.0/connect/userguide.html#configuring-connectors For scaling you should consider making tasks.max equal to the number of pods and ports.

Create Table and Mapping

Very similar to (Event Hub)[https://docs.microsoft.com/en-us/azure/data-explorer/ingest-data-event-hub#create-a-target-table-in-azure-data-explorer]

Publish data

In container, you can run interactive cli producer like so:

/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic testing1

or just pipe file (which contains example data)

/usr/local/bin/kafka-console-producer --broker-list localhost:9092 --topic testing1 < file.json

Query Data

Make sure no errors happened during ingestion

.show ingestion failures

See that newly ingested data becomes available for querying

KafkaTest | count

Supported formats

csv, json, avro, apacheAvro, parquet, orc, tsv, scsv, sohsv, psv, txt.

Note - avro, apacheAvro, parquet and orc files are sent each record (file) separately without aggregation, and are expected to be sent as a byte array containing the full file.

Use value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Supported compressions

Kusto Kafka connector can get compressed data, this can be specified in the topics_mapping in the configuration under eventDataCompression, this can get all the compression types kusto accepts. Using this configuration, files don't get aggregated in the connector and are sent straight for ingestion.

Avro example

One can use this gist FilesKafkaProducer to create a JAR file that can be used as a file producer which sends files as bytes to kafka.

  • Create an avro file as in src\test\resources\data.avro
  • Copy the jar docker cp C:\Users\ohbitton\IdeaProjects\kafka-producer-test\target\kafka-producer-all.jar <container id>:/FilesKafkaProducer.jar
  • Connect to the container docker exec -it <id> bash.
  • Run from the container java -jar FilesKafkaProducer.jar fileName [topic] [times]

Need Support?

  • Have a feature request for SDKs? Please post it on User Voice to help us prioritize
  • Have a technical question? Ask on Stack Overflow with tag "azure-data-explorer"
  • Need Support? Every customer with an active Azure subscription has access to support with guaranteed response time. Consider submitting a ticket and get assistance from Microsoft support team
  • Found a bug? Please help us fix it by thoroughly documenting it and filing an issue.

Contribute

We gladly accept community contributions.

  • Issues: Please report bugs using the Issues section of GitHub
  • Forums: Interact with the development teams on StackOverflow or the Microsoft Azure Forums
  • Source Code Contributions: If you would like to become an active contributor to this project please follow the instructions provided in Contributing.md.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

For general suggestions about Microsoft Azure please use our UserVoice forum.