REsolve the pr comments and clean the code
This commit is contained in:
Родитель
95a4fbc7b0
Коммит
c990e58405
|
@ -193,7 +193,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
Client engineClient = createKustoEngineClient(config);
|
||||
if (config.getTopicToTableMapping() != null) {
|
||||
JSONArray mappings = new JSONArray(config.getTopicToTableMapping());
|
||||
if(mappings.length()>0) {
|
||||
if(mappings.length() > 0) {
|
||||
if(isIngestorRole(mappings.getJSONObject(0), engineClient)) {
|
||||
for (int i = 0; i < mappings.length(); i++) {
|
||||
JSONObject mapping = mappings.getJSONObject(i);
|
||||
|
@ -225,7 +225,7 @@ public class KustoSinkTask extends SinkTask {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException {
|
||||
private boolean isIngestorRole(JSONObject testMapping, Client engineClient) throws JSONException {
|
||||
String database = testMapping.getString("db");
|
||||
String table = testMapping.getString("table");
|
||||
try {
|
||||
|
|
|
@ -57,7 +57,7 @@ class TopicPartitionWriter {
|
|||
private final BehaviorOnError behaviorOnError;
|
||||
|
||||
TopicPartitionWriter(TopicPartition tp, IngestClient client, TopicIngestionProperties ingestionProps,
|
||||
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> kafkaProducer)
|
||||
KustoSinkConfig config, boolean isDlqEnabled, String dlqTopicName, Producer<byte[], byte[]> dlqProducer)
|
||||
{
|
||||
this.tp = tp;
|
||||
this.client = client;
|
||||
|
@ -73,7 +73,7 @@ class TopicPartitionWriter {
|
|||
this.behaviorOnError = config.getBehaviorOnError();
|
||||
this.isDlqEnabled = isDlqEnabled;
|
||||
this.dlqTopicName = dlqTopicName;
|
||||
this.kafkaProducer = kafkaProducer;
|
||||
this.kafkaProducer = dlqProducer;
|
||||
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче