зеркало из
1
0
Форкнуть 0
This commit is contained in:
SanchayGupta1197 2020-07-13 16:13:06 +05:30
Родитель a9b7544d6c
Коммит 95a4fbc7b0
1 изменённых файлов: 21 добавлений и 3 удалений

Просмотреть файл

@ -193,11 +193,15 @@ public class KustoSinkTask extends SinkTask {
Client engineClient = createKustoEngineClient(config);
if (config.getTopicToTableMapping() != null) {
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
if(mappings.length()>0) {
if(isIngestorRole(mappings.getJSONObject(0), engineClient)) {
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
validateTableAccess(engineClient, mapping, config, databaseTableErrorList, accessErrorList);
}
}
}
}
String tableAccessErrorMessage = "";
if(!databaseTableErrorList.isEmpty())
@ -221,6 +225,20 @@ public class KustoSinkTask extends SinkTask {
}
}
private static boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException {
String database = testMapping.getString("db");
String table = testMapping.getString("table");
try {
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table));
} catch(DataServiceException | DataClientException err){
if(err.getCause().getMessage().contains("Forbidden:")){
log.warn("User might have ingestor privileges, table validation will be skipped for all table mappings ");
return false;
}
}
return true;
}
/**
* This function validates whether the user has the read and write access to the intended table
* before starting to sink records into ADX.