added deprecated configs and cleanup

This commit is contained in:
fahad 2020-06-11 01:14:31 +05:30
Родитель fe992a784b
Коммит a26a2ce111
2 изменённых файлов: 99 добавлений и 177 удалений

Просмотреть файл

@ -6,32 +6,17 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.testng.util.Strings;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class KustoSinkConfig extends AbstractConfig {
enum ErrorTolerance {
ALL, NONE;
/**
* Gets names of available enum.
* @return array of available enum
*/
public static String[] getNames() {
return Arrays
.stream(ErrorTolerance.class.getEnumConstants())
.map(Enum::name)
.toArray(String[]::new);
}
}
// TODO: this might need to be per kusto cluster...
static final String KUSTO_URL_CONF = "kusto.url";
private static final String KUSTO_URL_DOC = "Kusto cluster URL.";
private static final String KUSTO_URL_DOC = "Kusto cluster ingestion URL.";
private static final String KUSTO_URL_DISPLAY = "Kusto URL";
static final String KUSTO_AUTH_USERNAME_CONF = "kusto.auth.username";
@ -73,39 +58,12 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_SINK_FLUSH_INTERVAL_MS_DOC = "Kusto sink max staleness in milliseconds (per topic+partition combo).";
private static final String KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY = "Maximum Flush Interval";
static final String KUSTO_SINK_ERROR_TOLERANCE_CONF = "error.tolerance";
private static final String KUSTO_SINK_ERROR_TOLERANCE_DOC = "Error tolerance setting. "
+ "Must be configured to one of the following:\n"
+ "``NONE``\n"
+ "The Connector throws ConnectException and stops processing records "
+ "when an error occurs while processing or ingesting records into KustoDB.\n"
+ "``ALL``\n"
+ "Continues to process next subsequent records "
+ "when error occurs while processing or ingesting records into KustoDB.\n";
private static final String KUSTO_SINK_ERROR_TOLERANCE_DISPLAY = "Error Tolerance";
// Deprecated configs
static final String KUSTO_TABLES_MAPPING_CONF_DEPRECATED = "kusto.tables.topics_mapping";
static final String KUSTO_SINK_FLUSH_SIZE_BYTES_CONF_DEPRECATED = "kusto.sink.flush_size";
static final String KUSTO_SINK_FLUSH_INTERVAL_MS_CONF_DEPRECATED = "kusto.sink.flush_interval_ms";
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "dlq.bootstrap.servers";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC = "Configure this to Kafka broker's address(es) "
+ "to which the Connector should write failed records to.";
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Dead-Letter Queue Bootstrap Servers";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "dlq.topic.name";
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to Kafka topic's name "
+ "to which the failed records are to be sinked. "
+ "By default, the Connector will write failed records to {$origin-topic}-failed. "
+ "The Connector will create the topic if not already present with replication factor as 1. "
+ "To configure this to a desirable value, create topic from CLI prior to running the Connector.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Dead-Letter Queue Topic Name";
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "max.retry.time.ms";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time upto which the Connector "
+ "should retry writing records to KustoDB in case of failures.";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY = "Maximum Retry Time";
static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF = "retry.backoff.time.ms";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DOC = "BackOff time between retry attempts "
+ "the Connector makes to ingest records into KustoDB.";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY = "Retry BackOff Time";
private static final String DEPRECATED_CONFIG_DOC = "This configuration has been deprecated.";
public KustoSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
@ -120,11 +78,9 @@ public class KustoSinkConfig extends AbstractConfig {
final String connectionGroupName = "Connection";
final String writeGroupName = "Writes";
final String errorAndRetriesGroupName = "Error Handling and Retries";
int connectionGroupOrder = 0;
int writeGroupOrder = 0;
int errorAndRetriesGroupOrder = 0;
//TODO: Add display name, validators, recommenders to configs.
return new ConfigDef()
@ -191,13 +147,23 @@ public class KustoSinkConfig extends AbstractConfig {
.define(
KUSTO_TABLES_MAPPING_CONF,
Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
null,
Importance.HIGH,
KUSTO_TABLES_MAPPING_DOC,
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_TABLES_MAPPING_DISPLAY)
.define(
KUSTO_TABLES_MAPPING_CONF_DEPRECATED,
Type.STRING,
null,
Importance.HIGH,
KUSTO_TABLES_MAPPING_DOC + DEPRECATED_CONFIG_DOC,
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_TABLES_MAPPING_DISPLAY)
.define(
KUSTO_SINK_TEMP_DIR_CONF,
Type.STRING,
@ -219,6 +185,17 @@ public class KustoSinkConfig extends AbstractConfig {
writeGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_FLUSH_SIZE_BYTES_DISPLAY)
.define(
KUSTO_SINK_FLUSH_SIZE_BYTES_CONF_DEPRECATED,
Type.LONG,
FileUtils.ONE_MB,
ConfigDef.Range.atLeast(100),
Importance.MEDIUM,
KUSTO_SINK_FLUSH_SIZE_BYTES_DOC + DEPRECATED_CONFIG_DOC,
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_FLUSH_SIZE_BYTES_DISPLAY)
.define(
KUSTO_SINK_FLUSH_INTERVAL_MS_CONF,
Type.LONG,
@ -231,56 +208,16 @@ public class KustoSinkConfig extends AbstractConfig {
Width.MEDIUM,
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY)
.define(
KUSTO_SINK_ERROR_TOLERANCE_CONF,
Type.STRING,
ErrorTolerance.NONE.name(),
ConfigDef.ValidString.in("ALL", "NONE"),
Importance.LOW,
KUSTO_SINK_ERROR_TOLERANCE_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.LONG,
KUSTO_SINK_ERROR_TOLERANCE_DISPLAY)
.define(
KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF,
Type.STRING,
null,
Importance.LOW,
KUSTO_DLQ_BOOTSTRAP_SERVERS_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY)
.define(
KUSTO_DLQ_TOPIC_NAME_CONF,
Type.STRING,
null,
Importance.LOW,
KUSTO_DLQ_TOPIC_NAME_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_DLQ_TOPIC_NAME_DISPLAY)
.define(
KUSTO_SINK_MAX_RETRY_TIME_MS_CONF,
KUSTO_SINK_FLUSH_INTERVAL_MS_CONF_DEPRECATED,
Type.LONG,
TimeUnit.SECONDS.toMillis(300),
Importance.LOW,
KUSTO_SINK_MAX_RETRY_TIME_MS_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
ConfigDef.Range.atLeast(100),
Importance.HIGH,
KUSTO_SINK_FLUSH_INTERVAL_MS_DOC + DEPRECATED_CONFIG_DOC,
writeGroupName,
writeGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY)
.define(
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF,
Type.LONG,
TimeUnit.SECONDS.toMillis(10),
Importance.LOW,
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DOC,
errorAndRetriesGroupName,
errorAndRetriesGroupOrder++,
Width.MEDIUM,
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY);
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY);
}
@ -309,7 +246,10 @@ public class KustoSinkConfig extends AbstractConfig {
}
public String getTopicToTableMapping() {
return this.getString(KUSTO_TABLES_MAPPING_CONF);
// If the deprecated config is not set to default
return (!Strings.isNullOrEmpty(getString(KUSTO_TABLES_MAPPING_CONF)))
? getString(KUSTO_TABLES_MAPPING_CONF)
: getString(KUSTO_TABLES_MAPPING_CONF_DEPRECATED);
}
public String getTempDirPath() {
@ -317,35 +257,21 @@ public class KustoSinkConfig extends AbstractConfig {
}
public long getFlushSizeBytes() {
return this.getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF);
// If the deprecated config is not set to default
return (getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF_DEPRECATED) != FileUtils.ONE_MB)
? getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF_DEPRECATED)
: getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF);
}
public long getFlushInterval() {
return this.getLong(KUSTO_SINK_FLUSH_INTERVAL_MS_CONF);
}
public String getErrorTolerance() {
return this.getString(KUSTO_SINK_TEMP_DIR_CONF);
}
public String getDlqBootstrapServers() {
return this.getString(KUSTO_SINK_TEMP_DIR_CONF);
}
public String getDlqTopicName() {
return this.getString(KUSTO_SINK_TEMP_DIR_CONF);
}
public long getMaxRetryTime() {
return this.getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF);
}
public long getBackOffTime() {
return this.getLong(KUSTO_SINK_FLUSH_SIZE_BYTES_CONF);
// If the deprecated config is not set to default
return (getLong(KUSTO_SINK_FLUSH_INTERVAL_MS_CONF_DEPRECATED) != TimeUnit.SECONDS.toMillis(300))
? getLong(KUSTO_SINK_FLUSH_INTERVAL_MS_CONF_DEPRECATED)
: getLong(KUSTO_SINK_FLUSH_INTERVAL_MS_CONF);
}
public static void main(String[] args) {
System.out.println(getConfig().toEnrichedRst());
System.out.println(getConfig().toEnrichedRst());
}
}

Просмотреть файл

@ -41,14 +41,19 @@ import java.util.Set;
* Currently only ingested files are "committed" in the sense that we can advance the offset according to it.
*/
public class KustoSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);
static final int ROLE_INDEX = 0;
static final int PRINCIPAL_DISPLAY_NAME_INDEX = 2;
private final Set<TopicPartition> assignment;
private Map<String, TopicIngestionProperties> topicsToIngestionProps;
IngestClient kustoIngestClient;
Map<TopicPartition, TopicPartitionWriter> writers;
private long maxFileSize;
private long flushInterval;
private String tempDir;
IngestClient kustoIngestClient;
Map<TopicPartition, TopicPartitionWriter> writers;
public KustoSinkTask() {
assignment = new HashSet<>();
@ -95,7 +100,7 @@ public class KustoSinkTask extends SinkTask {
public static Client createKustoEngineClient(KustoSinkConfig config) {
try {
String engineClientURL = "https://" + StringUtils.removeStart(config.getKustoUrl(),"https://ingest-");
String engineClientURL = config.getKustoUrl().replace("https://ingest-", "https://");
if (!Strings.isNullOrEmpty(config.getKustoAuthAppid())) {
if (Strings.isNullOrEmpty(config.getAuthAppkey())) {
throw new ConfigException("Kusto authentication missing App Key.");
@ -129,7 +134,7 @@ public class KustoSinkTask extends SinkTask {
}
}
public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) throws ConfigException {
public static Map<String, TopicIngestionProperties> getTopicsToIngestionProps(KustoSinkConfig config) {
Map<String, TopicIngestionProperties> result = new HashMap<>();
try {
@ -140,55 +145,48 @@ public class KustoSinkTask extends SinkTask {
JSONObject mapping = mappings.getJSONObject(i);
try {
String db = mapping.getString("db");
String table = mapping.getString("table");
String db = mapping.getString("db");
String table = mapping.getString("table");
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
String format = mapping.optString("format");
CompressionType compressionType = StringUtils.isBlank(mapping.optString("eventDataCompression")) ? null : CompressionType.valueOf(mapping.optString("eventDataCompression"));
IngestionProperties props = new IngestionProperties(db, table);
IngestionProperties props = new IngestionProperties(db, table);
if (format != null && !format.isEmpty()) {
if (format.equals("json") || format.equals("singlejson")){
props.setDataFormat("multijson");
}
props.setDataFormat(format);
if (format != null && !format.isEmpty()) {
if (format.equals("json") || format.equals("singlejson")){
props.setDataFormat("multijson");
}
String mappingRef = mapping.optString("mapping");
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null) {
if (format.equals(IngestionProperties.DATA_FORMAT.json.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json);
} else if (format.equals(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.avro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.parquet);
} else if (format.equals(IngestionProperties.DATA_FORMAT.orc.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.orc);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv);
}
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.eventDataCompression = compressionType;
topicIngestionProperties.ingestionProperties = props;
result.put(mapping.getString("topic"), topicIngestionProperties);
} catch (Exception ex) {
throw new ConfigException("Malformed topics to kusto ingestion props mappings", ex);
props.setDataFormat(format);
}
return result;
String mappingRef = mapping.optString("mapping");
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null) {
if (format.equals(IngestionProperties.DATA_FORMAT.json.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.json);
} else if (format.equals(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.avro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.parquet);
} else if (format.equals(IngestionProperties.DATA_FORMAT.orc.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.orc);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.csv);
}
}
}
TopicIngestionProperties topicIngestionProperties = new TopicIngestionProperties();
topicIngestionProperties.eventDataCompression = compressionType;
topicIngestionProperties.ingestionProperties = props;
result.put(mapping.getString("topic"), topicIngestionProperties);
}
return result;
}
catch (Exception ex) {
throw new ConfigException(String.format("Error trying to parse kusto ingestion props %s",ex.getMessage()));
throw new ConfigException("Error while parsing kusto ingestion properties.", ex);
}
throw new ConfigException("Malformed topics to kusto ingestion props mappings");
}
public TopicIngestionProperties getIngestionProps(String topic) {
@ -204,22 +202,20 @@ public class KustoSinkTask extends SinkTask {
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
for (int i = 0; i < mappings.length(); i++) {
JSONObject mapping = mappings.getJSONObject(i);
String db = mapping.getString("db");
String table = mapping.getString("table");
validateTableAccess(config, engineClient, mapping, databaseTableErrorList, accessErrorList);
}
}
String tableAccessErrorMessage ="";
String tableAccessErrorMessage = "";
if(!databaseTableErrorList.isEmpty())
{
tableAccessErrorMessage = "\n\n Unable to access the following database:table \n " +
tableAccessErrorMessage = "\n\nUnable to access the following database:table\n" +
String.join("\n",databaseTableErrorList);
}
if(!accessErrorList.isEmpty())
{
tableAccessErrorMessage = tableAccessErrorMessage + "\n\nUser does not have appropriate permissions " +
"to sink data into the Kusto database:table combinations. " +
"to sink data into the Kusto database:table combination(s). " +
"Verify your Kusto principals and roles before proceeding for the following: \n " +
String.join("\n",accessErrorList);
}
@ -240,8 +236,6 @@ public class KustoSinkTask extends SinkTask {
* @param mapping JSON Object containing a Table mapping.
*/
private static void validateTableAccess(KustoSinkConfig config, Client engineClient, JSONObject mapping, List<String> databaseTableErrorList, List<String> accessErrorList) throws JSONException {
int ROLE_INDEX = 0;
int PRINCIPAL_DISPLAY_NAME_INDEX = 2;
String getPrincipalsQuery = ".show table %s principals";
String database = mapping.getString("db");
String table = mapping.getString("table");
@ -271,6 +265,7 @@ public class KustoSinkTask extends SinkTask {
} catch (DataClientException e) {
throw new ConnectException("Unable to connect to ADX(Kusto) instance", e);
} catch (DataServiceException e) {
// Logging the error so that the trace is not lost.
log.error("The table {} in database {} might not exist or access denied ", table, database, e);
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
}
@ -282,7 +277,7 @@ public class KustoSinkTask extends SinkTask {
}
@Override
public void open(Collection<TopicPartition> partitions) throws ConnectException {
public void open(Collection<TopicPartition> partitions) {
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
TopicIngestionProperties ingestionProps = getIngestionProps(tp.topic());
@ -323,7 +318,8 @@ public class KustoSinkTask extends SinkTask {
tempDir = config.getTempDirPath();
maxFileSize = config.getFlushSizeBytes();
flushInterval = config.getFlushInterval();
log.info(String.format("Kafka Kusto Sink started. target cluster: (%s), source topics: (%s)", url, topicsToIngestionProps.keySet().toString()));
log.info(String.format("Started KustoSinkTask with target cluster: (%s), source topics: (%s)",
url, topicsToIngestionProps.keySet().toString()));
open(context.assignment());
}
@ -334,7 +330,7 @@ public class KustoSinkTask extends SinkTask {
writer.close();
}
try {
if(kustoIngestClient!=null) {
if(kustoIngestClient != null) {
kustoIngestClient.close();
}
} catch (IOException e) {