зеркало из
1
0
Форкнуть 0
This commit is contained in:
fahad 2020-06-25 21:38:54 +05:30
Родитель f81d7be15c
Коммит 86c62f246f
2 изменённых файлов: 20 добавлений и 13 удалений

Просмотреть файл

@ -114,8 +114,23 @@ public class KustoSinkConfig extends AbstractConfig {
+ "the Connector makes to ingest records into Kusto table.";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY = "Errors Retry BackOff Time";
private final String tempDirPath;
public KustoSinkConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
tempDirPath = createAndReturnTempDirPath();
}
private String createAndReturnTempDirPath() {
String systemTempDirPath = getString(KUSTO_SINK_TEMP_DIR_CONF);
String tempDir = systemTempDirPath + "-" + UUID.randomUUID().toString();
Path path = Paths.get(tempDir);
try {
Files.createDirectories(path);
} catch (IOException e) {
throw new ConfigException("Failed to create temp directory="+tempDir, e);
}
return tempDir;
}
public KustoSinkConfig(Map<String, String> parsedConfig) {
@ -316,14 +331,6 @@ public class KustoSinkConfig extends AbstractConfig {
}
public String getTempDirPath() {
String systemTempDirPath = getString(KUSTO_SINK_TEMP_DIR_CONF);
String tempDirPath = systemTempDirPath + UUID.randomUUID().toString();
Path path = Paths.get(tempDirPath);
try {
Files.createDirectories(path);
} catch (IOException e) {
throw new ConfigException("Failed to create temp directory="+tempDirPath);
}
return tempDirPath;
}

Просмотреть файл

@ -101,7 +101,7 @@ public class TopicPartitionWriterTest {
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockClient, props, config);
Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_0.csv.gz").toString());
Assert.assertEquals(writer.getFilePath(null), Paths.get(config.getTempDirPath(), "kafka_testTopic_11_0.csv.gz").toString());
}
@Test
@ -127,7 +127,7 @@ public class TopicPartitionWriterTest {
writer.writeRecord(record);
}
Assert.assertEquals(writer.getFilePath(null), Paths.get(basePath, "kafka_testTopic_11_5.csv.gz").toString());
Assert.assertEquals(writer.getFilePath(null), Paths.get(config.getTempDirPath(), "kafka_testTopic_11_5.csv.gz").toString());
}
@Test
@ -199,7 +199,7 @@ public class TopicPartitionWriterTest {
writer.writeRecord(record);
}
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 3, IngestionProperties.DATA_FORMAT.csv.name())).toString());
Assert.assertEquals(writer.fileWriter.currentFile.path, Paths.get(config.getTempDirPath(), String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 3, IngestionProperties.DATA_FORMAT.csv.name())).toString());
writer.close();
}
@ -236,7 +236,7 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(writer.currentOffset, 16);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 15, IngestionProperties.DATA_FORMAT.csv.name())).toString());
Assert.assertEquals(currentFileName, Paths.get(config.getTempDirPath(), String.format("kafka_%s_%d_%d.%s.gz", tp.topic(), tp.partition(), 15, IngestionProperties.DATA_FORMAT.csv.name())).toString());
// Read
writer.fileWriter.finishFile(false);
@ -279,7 +279,7 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(writer.currentOffset, 10);
String currentFileName = writer.fileWriter.currentFile.path;
Assert.assertEquals(currentFileName, Paths.get(basePath, String.format("kafka_%s_%d_%d.%s", tp.topic(), tp.partition(), 10, IngestionProperties.DATA_FORMAT.avro.name())).toString());
Assert.assertEquals(currentFileName, Paths.get(config.getTempDirPath(), String.format("kafka_%s_%d_%d.%s", tp.topic(), tp.partition(), 10, IngestionProperties.DATA_FORMAT.avro.name())).toString());
writer.close();
}