From 95a4fbc7b05e4309626ab7c1c6b6f67ec37a83d0 Mon Sep 17 00:00:00 2001 From: SanchayGupta1197 Date: Mon, 13 Jul 2020 16:13:06 +0530 Subject: [PATCH] Resolved PR changes --- .../kafka/connect/sink/KustoSinkTask.java | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java index 5dc36d3..2ffa55f 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/KustoSinkTask.java @@ -193,9 +193,13 @@ public class KustoSinkTask extends SinkTask { Client engineClient = createKustoEngineClient(config); if (config.getTopicToTableMapping() != null) { JSONArray mappings = new JSONArray(config.getTopicToTableMapping()); - for (int i = 0; i < mappings.length(); i++) { - JSONObject mapping = mappings.getJSONObject(i); - validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList); + if(mappings.length()>0) { + if(isIngestorRole(mappings.getJSONObject(0), engineClient)) { + for (int i = 0; i < mappings.length(); i++) { + JSONObject mapping = mappings.getJSONObject(i); + validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList); + } + } } } String tableAccessErrorMessage = ""; @@ -221,6 +225,20 @@ public class KustoSinkTask extends SinkTask { } } + private static boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException { + String database = testMapping.getString("db"); + String table = testMapping.getString("table"); + try { + KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table)); + } catch(DataServiceException | DataClientException err){ + if(err.getCause().getMessage().contains("Forbidden:")){ + log.warn("User might have ingestor privileges, table validation will be skipped for all table mappings "); + return false; + } + } + return true; + } + /** * This function validates whether the user has the read and write access to the intended table * before starting to sink records into ADX.