From 41f1513ea73f5b6e5eb798115250692a7f2f165c Mon Sep 17 00:00:00 2001 From: ohad bitton <32278684+ohadbitt@users.noreply.github.com> Date: Tue, 4 Aug 2020 16:22:27 +0300 Subject: [PATCH] Precommit null check (#34) * null check * shouldnt hide appId Co-authored-by: Ohad Bitton --- pom.xml | 2 +- .../azure/kusto/kafka/connect/sink/KustoSinkTask.java | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index f039ae2..fda4b56 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ com.microsoft.azure kafka-sink-azure-kusto jar - 1.0.0 + 1.0.1 diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java index 2336e3d..1024e30 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -418,9 +418,11 @@ public class KustoSinkTask extends SinkTask { throw new ConnectException("Topic Partition not configured properly. " + "verify your `topics` and `kusto.tables.topics.mapping` configurations"); } - Long offset = writers.get(tp).lastCommittedOffset + 1; - if (offset != null) { + Long lastCommittedOffset = writers.get(tp).lastCommittedOffset; + + if (lastCommittedOffset != null) { + Long offset = lastCommittedOffset + 1L; log.debug("Forwarding to framework request to commit offset: {} for {} while the offset is {}", offset, tp, offsets.get(tp)); offsetsToCommit.put(tp, new OffsetAndMetadata(offset)); }