зеркало из
1
0
Форкнуть 0

Improve "Table not found" error message to be clearer and include exception message

This commit is contained in:
Yihezkel Schoenbrun 2021-06-24 00:40:06 +03:00
Родитель 4dc27c331f
Коммит 8871cbd483
1 изменённых файлов: 7 добавлений и 7 удалений

Просмотреть файл

@ -240,7 +240,7 @@ public class KustoSinkTask extends SinkTask {
hasAccess = true; hasAccess = true;
} }
} catch (DataServiceException e) { } catch (DataServiceException e) {
databaseTableErrorList.add(String.format("Database:%s Table:%s | table not found", database, table)); databaseTableErrorList.add(String.format("Caught exception while trying to validate access to Database '%s' Table '%s', with message '%s'", database, table, e.getMessage()));
} }
if (hasAccess) { if (hasAccess) {
try { try {
@ -268,7 +268,7 @@ public class KustoSinkTask extends SinkTask {
log.error("Error fetching principal roles with query {}", query, e); log.error("Error fetching principal roles with query {}", query, e);
databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table)); databaseTableErrorList.add(String.format("Database:%s Table:%s", database, table));
} else { } else {
log.warn("Failed to check permissions, will continue the run as the principal might still be able to ingest: {}", e); log.warn("Failed to check permissions with query '{}', will continue the run as the principal might still be able to ingest", query, e);
} }
} }
} }
@ -365,11 +365,11 @@ public class KustoSinkTask extends SinkTask {
@Override @Override
public void put(Collection<SinkRecord> records) { public void put(Collection<SinkRecord> records) {
SinkRecord lastRecord = null; SinkRecord lastRecord = null;
for (SinkRecord record : records) { for (SinkRecord sinkRecord : records) {
log.debug("Record to topic: {}", record.topic()); log.debug("Record to topic: {}", sinkRecord.topic());
lastRecord = record; lastRecord = sinkRecord;
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition()); TopicPartition tp = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
TopicPartitionWriter writer = writers.get(tp); TopicPartitionWriter writer = writers.get(tp);
if (writer == null) { if (writer == null) {
@ -379,7 +379,7 @@ public class KustoSinkTask extends SinkTask {
throw e; throw e;
} }
writer.writeRecord(record); writer.writeRecord(sinkRecord);
} }
if (lastRecord != null) { if (lastRecord != null) {