зеркало из
1
0
Форкнуть 0
This commit is contained in:
SanchayGupta1197 2020-06-05 09:31:36 +05:30
Родитель 903a45ecbf
Коммит b2a54d2e14
2 изменённых файлов: 39 добавлений и 20 удалений

Просмотреть файл

@ -26,7 +26,7 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_SINK_FLUSH_SIZE = "kusto.sink.flush_size";
static final String KUSTO_SINK_FLUSH_INTERVAL_MS = "kusto.sink.flush_interval_ms";
static final String KUSTO_SINK_WRITE_TO_FILES = "kusto.sink.write_to_files";
static final String KUSTO_AUTO_TABLE_CREATE = "kusto.auto_table_create";
static final String KUSTO_SINK_AUTO_TABLE_CREATE = "kusto.sink.auto_table_create";
public KustoSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
@ -40,15 +40,15 @@ public class KustoSinkConfig extends AbstractConfig {
return new ConfigDef()
.define(KUSTO_URL, Type.STRING, Importance.HIGH, "Kusto cluster url")
.define(KUSTO_TABLES_MAPPING, Type.STRING, null, Importance.HIGH, "Kusto target tables mapping (per topic mapping, 'topic1:table1;topic2:table2;')")
.define(KUSTO_AUTH_USERNAME, Type.STRING, null, Importance.HIGH, "Kusto auth using username,password combo: username")
.define(KUSTO_AUTH_PASSWORD, Type.STRING, null, Importance.HIGH, "Kusto auth using username,password combo: password")
.define(KUSTO_AUTH_APPID, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app id")
.define(KUSTO_AUTH_APPKEY, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app key")
.define(KUSTO_AUTH_AUTHORITY, Type.STRING, null, Importance.HIGH, "Kusto auth using appid,appkey combo: authority")
.define(KUSTO_AUTH_USERNAME, Type.PASSWORD, null, Importance.HIGH, "Kusto auth using username,password combo: username")
.define(KUSTO_AUTH_PASSWORD, Type.PASSWORD, null, Importance.HIGH, "Kusto auth using username,password combo: password")
.define(KUSTO_AUTH_APPID, Type.PASSWORD, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app id")
.define(KUSTO_AUTH_APPKEY, Type.PASSWORD, null, Importance.HIGH, "Kusto auth using appid,appkey combo: app key")
.define(KUSTO_AUTH_AUTHORITY, Type.PASSWORD, null, Importance.HIGH, "Kusto auth using appid,appkey combo: authority")
.define(KUSTO_SINK_TEMPDIR, Type.STRING, System.getProperty("java.io.tempdir"), Importance.LOW, "Temp dir that will be used by kusto sink to buffer records. defaults to system temp dir")
.define(KUSTO_SINK_FLUSH_SIZE, Type.LONG, FileUtils.ONE_MB, Importance.HIGH, "Kusto sink max buffer size (per topic+partition combo)")
.define(KUSTO_SINK_FLUSH_INTERVAL_MS, Type.LONG, TimeUnit.MINUTES.toMillis(5), Importance.HIGH, "Kusto sink max staleness in milliseconds (per topic+partition combo)")
.define(KUSTO_AUTO_TABLE_CREATE, Type.BOOLEAN, false, Importance.LOW,"Mark true to allow connector to create the table if it not exists.");
.define(KUSTO_SINK_AUTO_TABLE_CREATE, Type.BOOLEAN, false, Importance.LOW,"If true, allows the connector to create the table in Kusto if it not already exists.");
}
public String getKustoUrl() {
@ -91,6 +91,8 @@ public class KustoSinkConfig extends AbstractConfig {
return this.getLong(KUSTO_SINK_FLUSH_INTERVAL_MS);
}
public boolean getKustoAutoTableCreate() {return this.getBoolean(KUSTO_AUTO_TABLE_CREATE); }
public boolean getKustoAutoTableCreate() {
return this.getBoolean(KUSTO_SINK_AUTO_TABLE_CREATE);
}
}

Просмотреть файл

@ -20,6 +20,7 @@ import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -102,8 +103,6 @@ public class KustoSinkTask extends SinkTask {
String db = mapping.getString("db");
String table = mapping.getString("table");
validateTableAccess(config, db, table);
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
@ -156,14 +155,30 @@ public class KustoSinkTask extends SinkTask {
return topicsToIngestionProps.get(topic);
}
private void validateTableMappings(KustoSinkConfig config) {
try {
if (config.getKustoTopicToTableMapping() != null) {
JSONArray mappings = new JSONArray(config.getKustoTopicToTableMapping());
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
String db = mapping.getString("db");
String table = mapping.getString("table");
validateTableAccess(config,db,table);
}
}
} catch (JSONException e) {
throw new ConfigException(String.format("Error trying to parse kusto ingestion props %s", e.getMessage()));
}
}
/**
* This function validates whether the user has the read and write access to the intended table
* before starting to sink records into ADX
* @param config config class as defined by the user.
* before starting to sink records into ADX.
* @param config KustoSinkConfig class as defined by the user.
* @param database Name of the database the user needs to write records in.
* @param table The table to sink records into.
*/
public static void validateTableAccess(KustoSinkConfig config, String database, String table) {
private static void validateTableAccess(KustoSinkConfig config, String database, String table) {
ConnectionStringBuilder engineCsb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
@ -174,11 +189,12 @@ public class KustoSinkTask extends SinkTask {
);
try {
Client engineClient = ClientFactory.createClient(engineCsb);
//To check whether the table and database exist
// To check whether the table and database exist and fetch the schema of the existing table.
Results rs = engineClient.execute(database, table);
List<String> randomValueList = new ArrayList<>();
String ingestQuery = ".ingest inline into table %s <| %s";
//Creating random value liSt to be inserted into the test record
// Creating random value list to be inserted as a test record into the table.
// TODO Add the other datatypes supported in Kusto.
for (Entry entry : rs.getColumnNameToType().entrySet()) {
switch (entry.getValue().toString().toLowerCase()) {
case "string":
@ -193,16 +209,17 @@ public class KustoSinkTask extends SinkTask {
}
}
String randomValues = String.join(",", randomValueList);
// Adding a temporary record in order to check for write privileges.
rs = engineClient.execute(database, String.format(ingestQuery, table, randomValues));
// Extracting ExtentId to delete the temporary record.
String extentId = rs.getValues().get(0).get(0);
//Deleting the temporary record.
// Deleting the temporary record.
engineClient.execute(database, String.format(".drop extent %s", extentId));
log.info("User has read and Write access to the table {}", table);
log.info("User has appropriate permissions to sink data into the Kusto table={}", table);
} catch (URISyntaxException e) {
throw new ConnectException("Unable to connect to Server", e);
throw new ConnectException("Unable to connect to ADX(Kusto) instance due to invalid URL", e);
} catch (DataClientException e) {
e.printStackTrace();
throw new ConnectException("Unable to connect to ADX(Kusto) instance", e);
} catch (DataServiceException e) {
if (e.getCause().getMessage().contains("Database")) {
throw new ConfigException(
@ -259,7 +276,7 @@ public class KustoSinkTask extends SinkTask {
try {
KustoSinkConfig config = new KustoSinkConfig(props);
String url = config.getKustoUrl();
validateTableMappings(config);
topicsToIngestionProps = getTopicsToIngestionProps(config);
// this should be read properly from settings
kustoIngestClient = createKustoIngestClient(config);