This commit is contained in:
Ohad Bitton 2020-06-15 22:03:20 +03:00
Родитель 820f95cc1d
Коммит e5ec114092
3 изменённых файлов: 11 добавлений и 13 удалений

Просмотреть файл

@ -296,15 +296,15 @@ public class KustoSinkConfig extends AbstractConfig {
}
public String getKustoAuthAppid() {
return this.getString(KUSTO_AUTH_APPID_CONF);
return this.getPassword(KUSTO_AUTH_APPID_CONF).value();
}
public String getAuthAppkey() {
return this.getString(KUSTO_AUTH_APPKEY_CONF);
return this.getPassword(KUSTO_AUTH_APPKEY_CONF).value();
}
public String getAuthAuthority() {
return this.getString(KUSTO_AUTH_AUTHORITY_CONF);
return this.getPassword(KUSTO_AUTH_AUTHORITY_CONF).value();
}
public String getTopicToTableMapping() {

Просмотреть файл

@ -105,15 +105,15 @@ public class KustoSinkTask extends SinkTask {
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null) {
if (format.equals(IngestionProperties.DATA_FORMAT.json.toString())){
if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.json.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
} else if (format.equals(IngestionProperties.DATA_FORMAT.avro.toString())){
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.apacheavro.toString())){
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.apacheavro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.ApacheAvro);
} else if (format.equals(IngestionProperties.DATA_FORMAT.parquet.toString())) {
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.parquet.toString())) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Parquet);
} else if (format.equals(IngestionProperties.DATA_FORMAT.orc.toString())){
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.orc.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Orc);
} else {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Csv);

Просмотреть файл

@ -27,7 +27,7 @@ class TopicPartitionWriter {
private final IngestionProperties ingestionProps;
private final String basePath;
private final long flushInterval;
private boolean commitImmediately;
private boolean commitImmediately = false;
private final long fileThreshold;
FileWriter fileWriter;
long currentOffset;
@ -84,10 +84,8 @@ class TopicPartitionWriter {
return null;
}
String getFilePath(@Nullable Long offset) {
String getFilePath(Long offset) {
// Should be null if flushed by interval
offset = offset == null ? currentOffset : offset;
long nextOffset = fileWriter != null && fileWriter.isDirty() ? offset + 1 : offset;
String compressionExtension = "";
if (shouldCompressData(ingestionProps, null) || eventDataCompression != null) {
@ -98,7 +96,7 @@ class TopicPartitionWriter {
}
}
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), nextOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
return Paths.get(basePath, String.format("kafka_%s_%s_%d.%s%s", tp.topic(), tp.partition(), currentOffset, ingestionProps.getDataFormat(), compressionExtension)).toString();
}
void writeRecord(SinkRecord record) throws ConnectException {