Connecter task subdirectory
This commit is contained in:
Родитель
4667c071ef
Коммит
ea21c11ed9
|
@ -123,14 +123,15 @@ public class KustoSinkConfig extends AbstractConfig {
|
|||
|
||||
private String createAndReturnTempDirPath() {
|
||||
String systemTempDirPath = getString(KUSTO_SINK_TEMP_DIR_CONF);
|
||||
String tempDir = systemTempDirPath + "-" + UUID.randomUUID().toString();
|
||||
Path path = Paths.get(tempDir);
|
||||
String tempDir = "kusto-sink-connector-" + UUID.randomUUID().toString();
|
||||
Path path = Paths.get(systemTempDirPath,tempDir);
|
||||
|
||||
try {
|
||||
Files.createDirectories(path);
|
||||
} catch (IOException e) {
|
||||
throw new ConfigException("Failed to create temp directory="+tempDir, e);
|
||||
}
|
||||
return tempDir;
|
||||
return path.toString();
|
||||
}
|
||||
|
||||
public KustoSinkConfig(Map<String, String> parsedConfig) {
|
||||
|
|
|
@ -11,6 +11,7 @@ import com.microsoft.azure.kusto.ingest.IngestionMapping;
|
|||
import com.microsoft.azure.kusto.ingest.IngestionProperties;
|
||||
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
|
||||
import com.microsoft.azure.kusto.ingest.source.CompressionType;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
|
@ -26,6 +27,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import org.testng.util.Strings;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -273,8 +275,11 @@ public class KustoSinkTask extends SinkTask {
|
|||
writers.get(tp).close();
|
||||
writers.remove(tp);
|
||||
assignment.remove(tp);
|
||||
FileUtils.deleteDirectory(new File(config.getTempDirPath()));
|
||||
} catch (ConnectException e) {
|
||||
log.error("Error closing writer for {}. Error: {}", tp, e);
|
||||
} catch (IOException e) {
|
||||
log.error("Unable to delete temporary connector folder {}", config.getTempDirPath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче