From 10a722c165d6081a85b4ca6f51e0dee06b5ae2f3 Mon Sep 17 00:00:00 2001 From: hasher Date: Mon, 29 Jun 2020 15:36:34 +0530 Subject: [PATCH] Fix the indentation --- .../kusto/kafka/connect/sink/FileWriter.java | 4 +--- .../kafka/connect/sink/KustoSinkTask.java | 24 +++++++++---------- .../connect/sink/TopicPartitionWriter.java | 18 +++++++------- 3 files changed, 22 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java index 2f1a975..b6766db 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/FileWriter.java @@ -260,8 +260,6 @@ public class FileWriter implements Closeable { currentFile.rawBytes = recordWriter.getDataSize(); currentFile.zippedBytes += countingStream.numBytes; currentFile.numRecords++; - currentFile.zippedBytes = countingStream.numBytes; - currentFile.numRecords++; if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold) { rotate(record.kafkaOffset()); resetFlushTimer(true); @@ -289,7 +287,7 @@ public class FileWriter implements Closeable { else if ((record.valueSchema() != null) && (record.valueSchema().type() == Schema.Type.BYTES)){ recordWriterProvider = new ByteRecordWriterProvider(); } else { - throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format",record.valueSchema().type())); + throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",record.valueSchema().type())); } } diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java index d9af080..6ba0c05 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -69,10 +69,10 @@ public class KustoSinkTask extends SinkTask { } ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials( - config.getKustoUrl(), - config.getAuthAppid(), - config.getAuthAppkey(), - config.getAuthAuthority() + config.getKustoUrl(), + config.getAuthAppid(), + config.getAuthAppkey(), + config.getAuthAuthority() ); kcsb.setClientVersionForTracing(Version.CLIENT_NAME + ":" + Version.getVersion()); @@ -80,8 +80,8 @@ public class KustoSinkTask extends SinkTask { } throw new ConfigException("Failed to initialize KustoIngestClient, please " + - "provide valid credentials. Either Kusto username and password or " + - "Kusto appId, appKey, and authority should be configured."); + "provide valid credentials. Either Kusto username and password or " + + "Kusto appId, appKey, and authority should be configured."); } catch (Exception e) { throw new ConnectException("Failed to initialize KustoIngestClient", e); } @@ -95,10 +95,10 @@ public class KustoSinkTask extends SinkTask { throw new ConfigException("Kusto authentication missing App Key."); } ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials( - engineClientURL, - config.getAuthAppid(), - config.getAuthAppkey(), - config.getAuthAuthority() + engineClientURL, + config.getAuthAppid(), + config.getAuthAppkey(), + config.getAuthAuthority() ); kcsb.setClientVersionForTracing(Version.CLIENT_NAME + ":" + Version.getVersion()); @@ -106,8 +106,8 @@ public class KustoSinkTask extends SinkTask { } throw new ConfigException("Failed to initialize KustoEngineClient, please " + - "provide valid credentials. Either Kusto username and password or " + - "Kusto appId, appKey, and authority should be configured."); + "provide valid credentials. Either Kusto username and password or " + + "Kusto appId, appKey, and authority should be configured."); } catch (Exception e) { throw new ConnectException("Failed to initialize KustoEngineClient", e); } diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java index 010f42a..ac97132 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java @@ -124,20 +124,20 @@ class TopicPartitionWriter { long sleepTimeMs = retryBackOffTime; log.error("Failed to ingest records into KustoDB, backing off and retrying ingesting records after {} milliseconds.", sleepTimeMs); try { - TimeUnit.MILLISECONDS.sleep(sleepTimeMs); + TimeUnit.MILLISECONDS.sleep(sleepTimeMs); } catch (InterruptedException interruptedErr) { - if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) { - log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName); - fileDescriptor.records.forEach(this::sendFailedRecordToDlq); - } - throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e); + if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) { + log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName); + fileDescriptor.records.forEach(this::sendFailedRecordToDlq); + } + throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e); } } else { - if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) { + if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) { log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName); fileDescriptor.records.forEach(this::sendFailedRecordToDlq); - } - throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e); + } + throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e); } }