Merge branch 'master' into addOptionalEngineUrlConfig
This commit is contained in:
Коммит
5715669523
2
pom.xml
2
pom.xml
|
@ -9,7 +9,7 @@
|
||||||
<artifactId>kafka-sink-azure-kusto</artifactId>
|
<artifactId>kafka-sink-azure-kusto</artifactId>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
|
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
|
||||||
<version>1.0.2</version>
|
<version>1.0.3</version>
|
||||||
<properties>
|
<properties>
|
||||||
<kafka.version>1.0.0</kafka.version>
|
<kafka.version>1.0.0</kafka.version>
|
||||||
<json.version>20090211</json.version>
|
<json.version>20090211</json.version>
|
||||||
|
|
|
@ -146,7 +146,7 @@ public class KustoSinkTask extends SinkTask {
|
||||||
String mappingRef = mapping.optString(MAPPING);
|
String mappingRef = mapping.optString(MAPPING);
|
||||||
|
|
||||||
if (mappingRef != null && !mappingRef.isEmpty() && format != null) {
|
if (mappingRef != null && !mappingRef.isEmpty() && format != null) {
|
||||||
if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.json.toString())) {
|
if (format.equalsIgnoreCase(JSON_FORMAT) || format.equalsIgnoreCase(SINGLEJSON_FORMAT) || format.equalsIgnoreCase(MULTIJSON_FORMAT)) {
|
||||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
|
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
|
||||||
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())) {
|
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())) {
|
||||||
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
|
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
|
||||||
|
@ -232,6 +232,9 @@ public class KustoSinkTask extends SinkTask {
|
||||||
String table = mapping.getString(MAPPING_TABLE);
|
String table = mapping.getString(MAPPING_TABLE);
|
||||||
String format = mapping.getString(MAPPING_FORMAT);
|
String format = mapping.getString(MAPPING_FORMAT);
|
||||||
String mappingName = mapping.getString(MAPPING);
|
String mappingName = mapping.getString(MAPPING);
|
||||||
|
if (format.equalsIgnoreCase(JSON_FORMAT) || format.equalsIgnoreCase(SINGLEJSON_FORMAT) || format.equalsIgnoreCase(MULTIJSON_FORMAT)) {
|
||||||
|
format = JSON_FORMAT;
|
||||||
|
}
|
||||||
boolean hasAccess = false;
|
boolean hasAccess = false;
|
||||||
try {
|
try {
|
||||||
try {
|
try {
|
||||||
|
@ -264,8 +267,12 @@ public class KustoSinkTask extends SinkTask {
|
||||||
}
|
}
|
||||||
} catch (DataServiceException e) {
|
} catch (DataServiceException e) {
|
||||||
// Logging the error so that the trace is not lost.
|
// Logging the error so that the trace is not lost.
|
||||||
log.error("Error fetching principal roles with query {}", query, e);
|
if (!e.getCause().toString().contains("Forbidden")){
|
||||||
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
|
log.error("Error fetching principal roles with query {}", query, e);
|
||||||
|
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
|
||||||
|
} else {
|
||||||
|
log.warn("Failed to check permissions, will continue the run as the principal might still be able to ingest: {}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (DataClientException e) {
|
} catch (DataClientException e) {
|
||||||
|
|
Загрузка…
Ссылка в новой задаче