зеркало из
1
0
Форкнуть 0
This commit is contained in:
fahad 2020-06-25 15:47:03 +05:30
Родитель 6ec7e60e76
Коммит f4f97ccb55
7 изменённых файлов: 129 добавлений и 91 удалений

Просмотреть файл

@ -72,20 +72,20 @@ public class KustoSinkConfig extends AbstractConfig {
static final String KUSTO_BEHAVIOR_ON_ERROR_CONF = "behavior.on.error";
private static final String KUSTO_BEHAVIOR_ON_ERROR_DOC = "Behavior on error setting for "
+ "ingestion of records into KustoDB. "
+ "ingestion of records into Kusto table. "
+ "Must be configured to one of the following:\n"
+ "``fail``\n"
+ " Stops the connector when an error occurs "
+ "while processing records or ingesting records in KustoDB.\n"
+ "while processing records or ingesting records in Kusto table.\n"
+ "``ignore``\n"
+ " Continues to process next set of records "
+ "when error occurs while processing records or ingesting records in KustoDB.\n"
+ "when error occurs while processing records or ingesting records in Kusto table.\n"
+ "``log``\n"
+ " Logs the error message and continues to process subsequent records when an error occurs "
+ "while processing records or ingesting records in KustoDB, available in connect logs.";
+ "while processing records or ingesting records in Kusto table, available in connect logs.";
private static final String KUSTO_BEHAVIOR_ON_ERROR_DISPLAY = "Behavior On Error";
static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_CONF = "dlq.bootstrap.servers";
@ -95,18 +95,18 @@ public class KustoSinkConfig extends AbstractConfig {
private static final String KUSTO_DLQ_BOOTSTRAP_SERVERS_DISPLAY = "Dead-Letter Queue Bootstrap Servers";
static final String KUSTO_DLQ_TOPIC_NAME_CONF = "dlq.topic.name";
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to Kafka topic's name "
private static final String KUSTO_DLQ_TOPIC_NAME_DOC = "Set this to the Kafka topic's name "
+ "to which the failed records are to be sinked.";
private static final String KUSTO_DLQ_TOPIC_NAME_DISPLAY = "Dead-Letter Queue Topic Name";
static final String KUSTO_SINK_MAX_RETRY_TIME_MS_CONF = "errors.retry.max.time.ms";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DOC = "Maximum time upto which the Connector "
+ "should retry writing records to KustoDB in case of failures.";
+ "should retry writing records to Kusto table in case of failures.";
private static final String KUSTO_SINK_MAX_RETRY_TIME_MS_DISPLAY = "Errors Maximum Retry Time";
static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF = "errors.retry.backoff.time.ms";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DOC = "BackOff time between retry attempts "
+ "the Connector makes to ingest records into KustoDB.";
+ "the Connector makes to ingest records into Kusto table.";
private static final String KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY = "Errors Retry BackOff Time";
// Deprecated configs
@ -135,21 +135,20 @@ public class KustoSinkConfig extends AbstractConfig {
final String writeGroupName = "Writes";
final String errorAndRetriesGroupName = "Error Handling and Retries";
int connectionGroupOrder = 0;
int writeGroupOrder = 0;
int errorAndRetriesGroupOrder = 0;
ConfigDef result = new ConfigDef();
defineConnectionConfigs(connectionGroupName, connectionGroupOrder, result);
defineWriteConfigs(writeGroupName, writeGroupOrder, result);
defineErrorHandlingAndRetriesConfgis(errorAndRetriesGroupName, errorAndRetriesGroupOrder, result);
defineConnectionConfigs(connectionGroupName, result);
defineWriteConfigs(writeGroupName, result);
defineErrorHandlingAndRetriesConfgis(errorAndRetriesGroupName, result);
return result;
}
private static void defineErrorHandlingAndRetriesConfgis(final String errorAndRetriesGroupName,
int errorAndRetriesGroupOrder, ConfigDef result) {
ConfigDef result) {
int errorAndRetriesGroupOrder = 0;
result
.define(
KUSTO_BEHAVIOR_ON_ERROR_CONF,
@ -198,6 +197,7 @@ public class KustoSinkConfig extends AbstractConfig {
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_CONF,
Type.LONG,
TimeUnit.SECONDS.toMillis(10),
ConfigDef.Range.atLeast(1),
Importance.LOW,
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DOC,
errorAndRetriesGroupName,
@ -206,8 +206,10 @@ public class KustoSinkConfig extends AbstractConfig {
KUSTO_SINK_RETRY_BACKOFF_TIME_MS_DISPLAY);
}
private static void defineWriteConfigs(final String writeGroupName, int writeGroupOrder,
ConfigDef result) {
private static void defineWriteConfigs(final String writeGroupName, ConfigDef result) {
int writeGroupOrder = 0;
result
.define(
KUSTO_TABLES_MAPPING_CONF,
@ -285,8 +287,10 @@ public class KustoSinkConfig extends AbstractConfig {
KUSTO_SINK_FLUSH_INTERVAL_MS_DISPLAY);
}
private static void defineConnectionConfigs(final String connectionGroupName,
int connectionGroupOrder, ConfigDef result) {
private static void defineConnectionConfigs(final String connectionGroupName, ConfigDef result) {
int connectionGroupOrder = 0;
result
.define(
KUSTO_URL_CONF,

Просмотреть файл

@ -25,6 +25,10 @@ public class KustoSinkConnector extends SinkConnector {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks == 0) {
log.warn("No Connector tasks have been configured.");
}
List<Map<String, String>> configs = new ArrayList<>();
Map<String, String> taskProps = new HashMap<>();
taskProps.putAll(config.originalsStrings());

Просмотреть файл

@ -286,8 +286,7 @@ public class KustoSinkTask extends SinkTask {
config = new KustoSinkConfig(props);
String url = config.getKustoUrl();
// TODO : revisit the implementation
// validateTableMappings(config);
validateTableMappings(config);
topicsToIngestionProps = getTopicsToIngestionProps(config);

Просмотреть файл

@ -99,10 +99,12 @@ class TopicPartitionWriter {
log.info(String.format("Kusto ingestion: file (%s) of size (%s) at current offset (%s)", fileDescriptor.path, fileDescriptor.rawBytes, currentOffset));
this.lastCommittedOffset = currentOffset;
return;
} catch (IngestionServiceException | IngestionClientException exception) {
} catch (IngestionServiceException exception) {
// TODO : improve handling of specific transient exceptions once the client supports them.
// retrying transient exceptions
backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
} catch (IngestionClientException exception) {
throw new ConnectException(exception);
}
}
}
@ -200,8 +202,6 @@ class TopicPartitionWriter {
this.currentOffset = record.kafkaOffset();
fileWriter.write(value, record);
} catch (KafkaException ex) {
throw ex;
} catch (IOException ex) {
handleErrors(ex, "Failed to write records into file for ingestion.");
} finally {

Просмотреть файл

@ -36,8 +36,6 @@ public class E2ETest {
private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_'));
private String basePath = Paths.get("src/test/resources/", "testE2E").toString();
private Logger log = Logger.getLogger(this.getClass().getName());
private static final Boolean COMMIT_IMMEDIATELY = true;
@Test
@Ignore

Просмотреть файл

@ -43,7 +43,7 @@ public class KustoSinkTaskTest {
@Test
public void testSinkTaskOpen() throws Exception {
HashMap<String, String> props = new HashMap<>();
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_URL_CONF, "https://cluster_name.kusto.windows.net");
props.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "[{'topic': 'topic1','db': 'db1', 'table': 'table1','format': 'csv'},{'topic': 'topic2','db': 'db1', 'table': 'table1','format': 'json','mapping': 'Mapping'}]");
props.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");

Просмотреть файл

@ -31,7 +31,6 @@ public class TopicPartitionWriterTest {
// TODO: should probably find a better way to mock internal class (FileWriter)...
private File currentDirectory;
private static final String KUSTO_CLUSTER_URL = "https://ingest-cluster.kusto.windows.net";
private static final Boolean COMMIT_IMMEDIATELY = true;
@Before
public final void before() {
@ -63,11 +62,7 @@ public class TopicPartitionWriterTest {
IngestionProperties ingestionProperties = new IngestionProperties(db, table);
TopicIngestionProperties props = new TopicIngestionProperties();
props.ingestionProperties = ingestionProperties;
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
Map<String, String> settings = getKustoConfigs(basePath, fileThreshold, flushInterval);
KustoSinkConfig config= new KustoSinkConfig(settings);
TopicPartitionWriter writer = new TopicPartitionWriter(tp, mockedClient, props, config);
@ -91,6 +86,20 @@ public class TopicPartitionWriterTest {
Assert.assertEquals(fileSourceInfoArgument.getValue().getRawSizeInBytes(), 1024);
}
private Map<String, String> getKustoConfigs(String basePath, long fileThreshold,
long flushInterval) {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
return settings;
}
@Test
public void testGetFilename() {
TopicPartition tp = new TopicPartition("testTopic", 11);
@ -107,6 +116,10 @@ public class TopicPartitionWriterTest {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
@ -130,6 +143,10 @@ public class TopicPartitionWriterTest {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
@ -162,6 +179,10 @@ public class TopicPartitionWriterTest {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
@ -212,6 +233,10 @@ public class TopicPartitionWriterTest {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
@ -250,6 +275,10 @@ public class TopicPartitionWriterTest {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);
@ -304,6 +333,10 @@ public class TopicPartitionWriterTest {
Map<String, String> settings = new HashMap<>();
settings.put(KustoSinkConfig.KUSTO_URL_CONF, KUSTO_CLUSTER_URL);
settings.put(KustoSinkConfig.KUSTO_SINK_TEMP_DIR_CONF, basePath);
settings.put(KustoSinkConfig.KUSTO_TABLES_MAPPING_CONF, "mapping");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPID_CONF, "some-appid");
settings.put(KustoSinkConfig.KUSTO_AUTH_APPKEY_CONF, "some-appkey");
settings.put(KustoSinkConfig.KUSTO_AUTH_AUTHORITY_CONF, "some-authority");
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_SIZE_BYTES_CONF, String.valueOf(fileThreshold));
settings.put(KustoSinkConfig.KUSTO_SINK_FLUSH_INTERVAL_MS_CONF, String.valueOf(flushInterval));
KustoSinkConfig config= new KustoSinkConfig(settings);