зеркало из
1
0
Форкнуть 0
This commit is contained in:
hasher 2020-06-29 15:36:34 +05:30
Родитель a17e59fcaf
Коммит 10a722c165
3 изменённых файлов: 22 добавлений и 24 удалений

Просмотреть файл

@ -260,8 +260,6 @@ public class FileWriter implements Closeable {
currentFile.rawBytes = recordWriter.getDataSize();
currentFile.zippedBytes += countingStream.numBytes;
currentFile.numRecords++;
currentFile.zippedBytes = countingStream.numBytes;
currentFile.numRecords++;
if (this.flushInterval == 0 || currentFile.rawBytes > fileThreshold) {
rotate(record.kafkaOffset());
resetFlushTimer(true);
@ -289,7 +287,7 @@ public class FileWriter implements Closeable {
else if ((record.valueSchema() != null) && (record.valueSchema().type() == Schema.Type.BYTES)){
recordWriterProvider = new ByteRecordWriterProvider();
} else {
throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format",record.valueSchema().type()));
throw new ConnectException(String.format("Invalid Kafka record format, connector does not support %s format. This connector supports Avro, Json with schema, Json without schema, Byte, String format. ",record.valueSchema().type()));
}
}

Просмотреть файл

@ -69,10 +69,10 @@ public class KustoSinkTask extends SinkTask {
}
ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
config.getKustoUrl(),
config.getAuthAppid(),
config.getAuthAppkey(),
config.getAuthAuthority()
config.getKustoUrl(),
config.getAuthAppid(),
config.getAuthAppkey(),
config.getAuthAuthority()
);
kcsb.setClientVersionForTracing(Version.CLIENT_NAME + ":" + Version.getVersion());
@ -80,8 +80,8 @@ public class KustoSinkTask extends SinkTask {
}
throw new ConfigException("Failed to initialize KustoIngestClient, please " +
"provide valid credentials. Either Kusto username and password or " +
"Kusto appId, appKey, and authority should be configured.");
"provide valid credentials. Either Kusto username and password or " +
"Kusto appId, appKey, and authority should be configured.");
} catch (Exception e) {
throw new ConnectException("Failed to initialize KustoIngestClient", e);
}
@ -95,10 +95,10 @@ public class KustoSinkTask extends SinkTask {
throw new ConfigException("Kusto authentication missing App Key.");
}
ConnectionStringBuilder kcsb = ConnectionStringBuilder.createWithAadApplicationCredentials(
engineClientURL,
config.getAuthAppid(),
config.getAuthAppkey(),
config.getAuthAuthority()
engineClientURL,
config.getAuthAppid(),
config.getAuthAppkey(),
config.getAuthAuthority()
);
kcsb.setClientVersionForTracing(Version.CLIENT_NAME + ":" + Version.getVersion());
@ -106,8 +106,8 @@ public class KustoSinkTask extends SinkTask {
}
throw new ConfigException("Failed to initialize KustoEngineClient, please " +
"provide valid credentials. Either Kusto username and password or " +
"Kusto appId, appKey, and authority should be configured.");
"provide valid credentials. Either Kusto username and password or " +
"Kusto appId, appKey, and authority should be configured.");
} catch (Exception e) {
throw new ConnectException("Failed to initialize KustoEngineClient", e);
}

Просмотреть файл

@ -124,20 +124,20 @@ class TopicPartitionWriter {
long sleepTimeMs = retryBackOffTime;
log.error("Failed to ingest records into KustoDB, backing off and retrying ingesting records after {} milliseconds.", sleepTimeMs);
try {
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
TimeUnit.MILLISECONDS.sleep(sleepTimeMs);
} catch (InterruptedException interruptedErr) {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e);
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException(String.format("Retrying ingesting records into KustoDB was interuppted after retryAttempts=%s", retryAttempts+1), e);
}
} else {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
if (isDlqEnabled && behaviorOnError != BehaviorOnError.FAIL) {
log.warn("Writing {} failed records to DLQ topic={}", fileDescriptor.records.size(), dlqTopicName);
fileDescriptor.records.forEach(this::sendFailedRecordToDlq);
}
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e);
}
throw new ConnectException("Retry attempts exhausted, failed to ingest records into KustoDB.", e);
}
}