* fix int cant be cast to long

* multijson support
and fix int cant be cast to long

* v1.0.3

Co-authored-by: Ohad Bitton <ohbitton@microsoft.com>
This commit is contained in:
ohad bitton 2020-10-13 00:22:20 +03:00 коммит произвёл GitHub
Родитель 3ca9dc9052
Коммит e4d15149a1
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 17 добавлений и 10 удалений

Просмотреть файл

@ -9,7 +9,7 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>1.0.2</version>
<version>1.0.3</version>
<properties>
<kafka.version>1.0.0</kafka.version>
<json.version>20090211</json.version>

Просмотреть файл

@ -1,10 +1,7 @@
package com.microsoft.azure.kusto.kafka.connect.sink;
import com.google.common.base.Strings;
import com.microsoft.azure.kusto.data.Client;
import com.microsoft.azure.kusto.data.ClientFactory;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import com.microsoft.azure.kusto.ingest.IngestClient;
@ -149,7 +146,7 @@ public class KustoSinkTask extends SinkTask {
if (mappingRef != null && !mappingRef.isEmpty()) {
if (format != null) {
if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.json.toString())){
if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) {
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Json);
} else if (format.equalsIgnoreCase(IngestionProperties.DATA_FORMAT.avro.toString())){
props.setIngestionMapping(mappingRef, IngestionMapping.IngestionMappingKind.Avro);
@ -241,11 +238,16 @@ public class KustoSinkTask extends SinkTask {
String table = mapping.getString("table");
String format = mapping.getString("format");
String mappingName = mapping.getString("mapping");
if (format.equalsIgnoreCase("json") || format.equalsIgnoreCase("singlejson") || format.equalsIgnoreCase("multijson")) {
format = "json";
}
boolean hasAccess = false;
try {
try {
KustoOperationResult rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table));
if ((long) rs.getPrimaryResults().getData().get(0).get(0) >= 0) {
KustoResultSetTable rs = engineClient.execute(database, String.format(FETCH_TABLE_QUERY, table)).getPrimaryResults();
rs.next();
if (rs.getLong(0) >= 0) {
hasAccess = true;
}
@ -277,8 +279,13 @@ public class KustoSinkTask extends SinkTask {
}
} catch (DataServiceException e) {
// Logging the error so that the trace is not lost.
log.error("{}", e);
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
if (!e.getCause().toString().contains("Forbidden")){
log.error("{}", e);
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
} else {
log.warn("Failed to check permissions, will continue the run as the principal might still be able to ingest: {}", e);
}
}
}
} catch (DataClientException e) {