From e4d15149a12c0a659f1863c45c99940b3ca7e76d Mon Sep 17 00:00:00 2001 From: ohad bitton <32278684+ohadbitt@users.noreply.github.com> Date: Tue, 13 Oct 2020 00:22:20 +0300 Subject: [PATCH] Fix multijson (#45) * fix int cant be cast to long * multijson support and fix int cant be cast to long * v1.0.3 Co-authored-by: Ohad Bitton --- pom.xml | 2 +- .../kafka/connect/sink/KustoSinkTask.java | 25 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index ee39312..a37c010 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ kafka-sink-azure-kusto jar A Kafka Connect plugin for Azure Data Explorer (Kusto) Database - 1.0.2 + 1.0.3 1.0.0 20090211 diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java index a2c9ff5..d429828 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -1,10 +1,7 @@ package com.microsoft.azure.kusto.kafka.connect.sink; import com.google.common.base.Strings; -import com.microsoft.azure.kusto.data.Client; -import com.microsoft.azure.kusto.data.ClientFactory; -import com.microsoft.azure.kusto.data.ConnectionStringBuilder; -import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.*; import com.microsoft.azure.kusto.data.exceptions.DataClientException; import com.microsoft.azure.kusto.data.exceptions.DataServiceException; import com.microsoft.azure.kusto.ingest.IngestClient; @@ -149,7 +146,7 @@ public class KustoSinkTask extends SinkTask { if (mappingRef != null && !mappingRef.isEmpty()) { if (format != null) { - if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.json.toString())){ + if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) { props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json); } else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())){ props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro); @@ -241,11 +238,16 @@ public class KustoSinkTask extends SinkTask { String table = mapping.getString("table"); String format = mapping.getString("format"); String mappingName = mapping.getString("mapping"); + if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) { + format = "json"; + } + boolean hasAccess = false; try { try { - KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table)); - if ((long) rs.getPrimaryResults().getData().get(0).get(0) >= 0) { + KustoResultSetTable rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table)).getPrimaryResults(); + rs.next(); + if (rs.getLong(0) >= 0) { hasAccess = true; } @@ -277,8 +279,13 @@ public class KustoSinkTask extends SinkTask { } } catch (DataServiceException e) { // Logging the error so that the trace is not lost. - log.error("{}", e); - databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table)); + if (!e.getCause().toString().contains("Forbidden")){ + log.error("{}", e); + databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table)); + } else { + log.warn("Failed to check permissions, will continue the run as the principal might still be able to ingest: {}", e); + } + } } } catch (DataClientException e) {