From c990e58405a9c628610074b74980ce5ff6ad1d91 Mon Sep 17 00:00:00 2001 From: hasher Date: Tue, 14 Jul 2020 16:32:00 +0530 Subject: [PATCH] REsolve the pr comments and clean the code --- .../azure/kusto/kafka/connect/sink/KustoSinkTask.java | 4 ++-- .../azure/kusto/kafka/connect/sink/TopicPartitionWriter.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java index 2ffa55f..3ddfb5e 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -193,7 +193,7 @@ public class KustoSinkTask extends SinkTask { Client engineClient = createKustoEngineClient(config); if (config.getTopicToTableMapping() != null) { JSONArray mappings = new JSONArray(config.getTopicToTableMapping()); - if(mappings.length()>0) { + if(mappings.length() > 0) { if(isIngestorRole(mappings.getJSONObject(0), engineClient)) { for (int i = 0; i < mappings.length(); i++) { JSONObject mapping = mappings.getJSONObject(i); @@ -225,7 +225,7 @@ public class KustoSinkTask extends SinkTask { } } - private static boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException { + private boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException { String database = testMapping.getString("db"); String table = testMapping.getString("table"); try { diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java index 13a7922..2dd3dda 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java @@ -57,7 +57,7 @@ class TopicPartitionWriter { private final BehaviorOnError behaviorOnError; TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps, - KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer kafkaProducer) + KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer dlqProducer) { this.tp = tp; this.client = client; @@ -73,7 +73,7 @@ class TopicPartitionWriter { this.behaviorOnError = config.getBehaviorOnError(); this.isDlqEnabled = isDlqEnabled; this.dlqTopicName = dlqTopicName; - this.kafkaProducer = kafkaProducer; + this.kafkaProducer = dlqProducer; }