зеркало из
1
0
Форкнуть 0
This commit is contained in:
Daniel Dubovski 2018-09-03 09:52:35 +03:00
Родитель 1ddad12e05
Коммит 2534c39101
2 изменённых файлов: 20 добавлений и 2 удалений

Просмотреть файл

@ -7,8 +7,13 @@ import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream;
/**
* This class is used to write gzipped rolling files.
* Currently supports size based rolling, where size is for *uncompressed* size,
* so final size can vary.
*/
public class GZIPFileWriter implements Closeable {
// callbacks
private Consumer<GZIPFileDescriptor> onRollCallback;
private Supplier<String> getFilePath;
@ -18,8 +23,16 @@ public class GZIPFileWriter implements Closeable {
private CountingOutputStream fileStream;
private long fileThreshold;
/**
* @param basePath - This is path to which to write the files to.
* @param fileThreshold - Max size, uncompressed bytes.
* @param onRollCallback - Callback to allow code to execute when rolling a file. Blocking code.
* @param getFilePath - Allow external resolving of file name.
*/
public GZIPFileWriter(String basePath, long fileThreshold,
Consumer<GZIPFileDescriptor> onRollCallback, Supplier<String> getFilePath) {
Consumer<GZIPFileDescriptor> onRollCallback,
Supplier<String> getFilePath) {
this.getFilePath = getFilePath;
this.basePath = basePath;
this.fileThreshold = fileThreshold;

Просмотреть файл

@ -19,6 +19,11 @@ import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Kusto sink uses file system to buffer records.
* Every time a file is rolled, we used the kusto client to ingest it.
* Currently only ingested files are "commited" in the sense that we can advance the offset according to it.
*/
public class KustoSinkTask extends SinkTask {
static final String TOPICS_WILDCARD = "*";
private static final Logger log = LoggerFactory.getLogger(KustoSinkTask.class);