This commit is contained in:
fahad 2020-06-11 18:50:38 +05:30
Родитель f7c5a8b6af
Коммит 96dc07bea8
1 изменённых файлов: 12 добавлений и 20 удалений

Просмотреть файл

@ -46,9 +46,10 @@ import java.util.concurrent.TimeoutException;
*/
public class KustoSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
static final int ROLE_INDEX = 0;
static final int PRINCIPAL_DISPLAY_NAME_INDEX = 2;
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
static final String FETCH_PRINCIPAL_ROLES_QUERY = ".show table %s principal roles";
static final int ROLE_INDEX = 3;
private final Set<TopicPartition> assignment;
private Map<String, TopicIngestionProperties> topicsToIngestionProps;
@ -215,7 +216,7 @@ public class KustoSinkTask extends SinkTask {
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
validateTableAccess(config, engineClient, mapping, databaseTableErrorList, accessErrorList);
validateTableAccess(engineClient, mapping, databaseTableErrorList, accessErrorList);
}
}
String tableAccessErrorMessage = "";
@ -244,31 +245,22 @@ public class KustoSinkTask extends SinkTask {
/**
* This function validates whether the user has the read and write access to the intended table
* before starting to sink records into ADX.
* @param config KustoSinkConfig class as defined by the user.
* @param engineClient Client connection to run queries.
* @param mapping JSON Object containing a Table mapping.
*/
private static void validateTableAccess(KustoSinkConfig config, Client engineClient, JSONObject mapping, List<String> databaseTableErrorList, List<String> accessErrorList) throws JSONException {
String getPrincipalsQuery = ".show table %s principals";
private static void validateTableAccess(Client engineClient, JSONObject mapping, List<String> databaseTableErrorList, List<String> accessErrorList) throws JSONException {
String database = mapping.getString("db");
String table = mapping.getString("table");
try {
Results rs = engineClient.execute(database, String.format(getPrincipalsQuery, table));
String authenticateWith;
if (config.getAuthAppid() != null) {
authenticateWith = config.getAuthAppid();
} else {
authenticateWith=config.getAuthUsername();
}
Results rs = engineClient.execute(database, String.format(FETCH_PRINCIPAL_ROLES_QUERY, table));
boolean hasAccess = false;
for (int i = 0; i<rs.getValues().size(); i++) {
ArrayList<String> principal = rs.getValues().get(i);
if (principal.get(PRINCIPAL_DISPLAY_NAME_INDEX).contains(authenticateWith)) {
if (principal.get(ROLE_INDEX).toLowerCase().contains("admin") ||
principal.get(ROLE_INDEX).toLowerCase().contains("ingestor")) {
hasAccess = true;
break;
}
if (principal.get(ROLE_INDEX).toLowerCase().contains("admin") ||
principal.get(ROLE_INDEX).toLowerCase().contains("ingestor")) {
hasAccess = true;
break;
}
}
if (!hasAccess) {