This commit is contained in:
Ohad Bitton 2019-11-26 10:51:26 +02:00
Родитель a5a7512cd4
Коммит ccc106d92d
4 изменённых файлов: 24 добавлений и 30 удалений

Просмотреть файл

@ -120,12 +120,6 @@
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<!--FIXME:this is here only until I map this project to the actual Kusto Java SDK-->
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>

Просмотреть файл

@ -23,13 +23,12 @@ public class GZIPFileWriter implements Closeable {
public GZIPFileDescriptor currentFile;
private Timer timer;
private Consumer<GZIPFileDescriptor> onRollCallback;
private long flushInterval;
private final long flushInterval;
private Supplier<String> getFilePath;
private GZIPOutputStream gzipStream;
private String basePath;
private CountingOutputStream fileStream;
private long fileThreshold;
private final boolean flushImmediately;
/**
* @param basePath - This is path to which to write the files to.
@ -41,14 +40,12 @@ public class GZIPFileWriter implements Closeable {
long fileThreshold,
Consumer<GZIPFileDescriptor> onRollCallback,
Supplier<String> getFilePath,
long flushInterval,
boolean flushImmediately) {
long flushInterval) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;
this.onRollCallback = onRollCallback;
this.flushInterval = flushInterval;
this.flushImmediately = flushImmediately;
}
public boolean isDirty() {
@ -69,7 +66,7 @@ public class GZIPFileWriter implements Closeable {
currentFile.zippedBytes += fileStream.numBytes;
currentFile.numRecords++;
if (this.flushImmediately || (currentFile.rawBytes + data.length) > fileThreshold) {
if (this.flushInterval == 0 || (currentFile.rawBytes + data.length) > fileThreshold) {
rotate();
resetFlushTimer(true);
}
@ -137,22 +134,24 @@ public class GZIPFileWriter implements Closeable {
// Set shouldDestroyTimer to true if the current running task should be cancelled
private void resetFlushTimer(Boolean shouldDestroyTimer) {
if (shouldDestroyTimer) {
if (timer != null) {
timer.purge();
timer.cancel();
if (flushInterval > 0) {
if (shouldDestroyTimer) {
if (timer != null) {
timer.purge();
timer.cancel();
}
timer = new Timer(true);
}
timer = new Timer(true);
TimerTask t = new TimerTask() {
@Override
public void run() {
flushByTimeImpl();
}
};
timer.schedule(t, flushInterval);
}
TimerTask t = new TimerTask() {
@Override
public void run() {
flushByTimeImpl();
}
};
timer.schedule(t, flushInterval);
}
private void flushByTimeImpl() {

Просмотреть файл

@ -142,7 +142,7 @@ public class KustoSinkTask extends SinkTask {
assignment.addAll(partitions);
for (TopicPartition tp : assignment) {
IngestionProperties ingestionProps = getIngestionProps(tp.topic());
log.warn("Open Kusto topic: " + tp.toString());
log.debug(String.format("Open Kusto topic: '%s' with partition: '%s'", tp.topic(), tp.partition()));
if (ingestionProps == null) {
throw new ConnectException(String.format("Kusto Sink has no ingestion props mapped for the topic: %s. please check your configuration.", tp.topic()));
} else {
@ -202,7 +202,7 @@ public class KustoSinkTask extends SinkTask {
@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
for (SinkRecord record : records) {
log.warn("record to topic:" + record.topic());
log.debug("record to topic:" + record.topic());
TopicPartition tp = new TopicPartition(record.topic(), record.kafkaPartition());
TopicPartitionWriter writer = writers.get(tp);

Просмотреть файл

@ -89,9 +89,10 @@ public class TopicPartitionWriter {
}
public void open() {
gzipFileWriter = new GZIPFileWriter(basePath, fileThreshold, this::handleRollFile, this::getFilePath, flushInterval,
ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString()) ||
ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString()));
boolean flushImmediately = ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.avro.toString()) ||
ingestionProps.getDataFormat().equals(IngestionProperties.DATA_FORMAT.parquet.toString());
gzipFileWriter = new GZIPFileWriter(basePath, fileThreshold, this::handleRollFile, this::getFilePath, flushImmediately ? 0: flushInterval);
}
public void close() {