From df2cde2d118977b8341cbe063d9327cfe2e31fb1 Mon Sep 17 00:00:00 2001 From: Ohad Bitton Date: Tue, 4 Feb 2020 09:57:45 +0200 Subject: [PATCH] e2e --- .../kusto/kafka/connect/sink/E2ETest.java | 160 ++++++++++++++++++ src/test/resources/data.avro | Bin 0 -> 1184 bytes 2 files changed, 160 insertions(+) create mode 100644 src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java create mode 100644 src/test/resources/data.avro diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java new file mode 100644 index 0000000..bd983e8 --- /dev/null +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/E2ETest.java @@ -0,0 +1,160 @@ +package com.microsoft.azure.kusto.kafka.connect.sink; + +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.ConnectionStringBuilder; +import com.microsoft.azure.kusto.data.Results; +import com.microsoft.azure.kusto.data.exceptions.DataClientException; +import com.microsoft.azure.kusto.data.exceptions.DataServiceException; +import com.microsoft.azure.kusto.ingest.IngestClient; +import com.microsoft.azure.kusto.ingest.IngestClientFactory; +import com.microsoft.azure.kusto.ingest.IngestionMapping; +import com.microsoft.azure.kusto.ingest.IngestionProperties; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.testng.Assert; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +//-ea -DappId=d5e0a24c-3a09-40ce-a1d6-dc5ab58dae66 -DappKey=L+0hoM34kqC22XRniWOgkETwVvawiir2odEjYqZeyXA= -Dcluster=ohbitton.dev -Ddatabase=ohtst -Dtable=TestTable2 +public class E2ETest { + private static final String testPrefix = "tmpKafkaE2ETest"; + private String appId = System.getProperty("appId"); + private String appKey = System.getProperty("appKey"); + private String authority = System.getProperty("authority", "microsoft.com"); + private String cluster = System.getProperty("cluster"); + private String database = System.getProperty("database"); + private String tableBaseName = System.getProperty("table", testPrefix + UUID.randomUUID().toString().replace('-', '_')); + private String basePath = Paths.get("src/test/resources/", "testE2E").toString(); + + @Test + //@Ignore + public void testE2ECsv() throws URISyntaxException, DataClientException, DataServiceException { + String table = tableBaseName + "csv"; + ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority); + Client engineClient = ClientFactory.createClient(engineCsb); + + if(tableBaseName.startsWith(testPrefix)) { + engineClient.execute(database, String.format(".create table %s (ColA:string,ColB:int)", table)); + } + try { + engineClient.execute(database, String.format(".create table ['%s'] ingestion csv mapping 'mappy' " + + "'[" + + "{\"column\":\"ColA\", \"DataType\":\"string\", \"Properties\":{\"Ordinal\":\"0\"}}," + + "{\"column\":\"ColB\", \"DataType\":\"int\", \"Properties\":{\"Ordinal\":\"1\"}}," + + "]'", table)); + + TopicPartition tp = new TopicPartition("testPartition", 11); + + ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://ingest-%s.kusto.windows.net", cluster), appId, appKey, authority); + IngestClient ingestClient = IngestClientFactory.createClient(csb); + IngestionProperties ingestionProperties = new IngestionProperties(database, table); + String[] messages = new String[]{"stringy message,1", "another,2"}; + + // Expect to finish file after writing forth message cause of fileThreshold + long fileThreshold = messages[0].length(); + long flushInterval = 0; + TopicIngestionProperties props = new TopicIngestionProperties(); + props.ingestionProperties = ingestionProperties; + props.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.csv); + + TopicPartitionWriter writer = new TopicPartitionWriter(tp, ingestClient, props, Paths.get(basePath, "csv").toString(), fileThreshold, flushInterval); + writer.open(); + + List records = new ArrayList(); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, Schema.BYTES_SCHEMA, messages[0].getBytes(), 10)); + records.add(new SinkRecord(tp.topic(), tp.partition(), null, null, null, messages[0], 10)); + + for (SinkRecord record : records) { + writer.writeRecord(record); + } + + validateExpectedResults(engineClient, 2, table); + } catch (InterruptedException e) { + Assert.fail("Test failed", e); + + } finally { + if (table.startsWith(testPrefix)) { + engineClient.execute(database, ".drop table " + table); + } + } + } + + @Test + //@Ignore + public void testE2EAvro() throws URISyntaxException, DataClientException, DataServiceException { + String table = tableBaseName + "avro"; + ConnectionStringBuilder engineCsb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://%s.kusto.windows.net", cluster), appId, appKey, authority); + Client engineClient = ClientFactory.createClient(engineCsb); + try { + + ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(String.format("https://ingest-%s.kusto.windows.net", cluster), appId, appKey, authority); + IngestClient ingestClient = IngestClientFactory.createClient(csb); + if (tableBaseName.startsWith(testPrefix)) { + engineClient.execute(database, String.format(".create table %s (ColA:string,ColB:int)", table)); + } + engineClient.execute(database, String.format(".create table ['%s'] ingestion avro mapping 'avri' " + + "'[" + + "{\"column\": \"ColA\", \"Properties\":{\"Field\":\"XText\"}}," + + "{\"column\": \"ColB\", \"Properties\":{\"Field\":\"RowNumber\"}}" + + "]'", table)); + + IngestionProperties ingestionProperties = new IngestionProperties(database, table); + + TopicIngestionProperties props2 = new TopicIngestionProperties(); + props2.ingestionProperties = ingestionProperties; + props2.ingestionProperties.setDataFormat(IngestionProperties.DATA_FORMAT.avro); + props2.ingestionProperties.setIngestionMapping("avri", IngestionMapping.IngestionMappingKind.avro); + TopicPartition tp2 = new TopicPartition("testPartition2", 11); + TopicPartitionWriter writer2 = new TopicPartitionWriter(tp2, ingestClient, props2, Paths.get(basePath, "avro").toString(), 10, 300000); + writer2.open(); + List records2 = new ArrayList(); + + FileInputStream fs = new FileInputStream("src/test/resources/data.avro"); + byte[] buffer = new byte[1184]; + if (fs.read(buffer) != 1184) { + Assert.fail("Error while "); + } + records2.add(new SinkRecord(tp2.topic(), tp2.partition(), null, null, Schema.BYTES_SCHEMA, buffer, 10)); + for (SinkRecord record : records2) { + writer2.writeRecord(record); + } + + validateExpectedResults(engineClient, 2, table); + } catch (InterruptedException | IOException e) { + Assert.fail("Test failed", e); + } finally { + if (table.startsWith(testPrefix)) { + engineClient.execute(database, ".drop table " + table); + } + } + } + + private void validateExpectedResults(Client engineClient, Integer expectedNumberOfRows, String table) throws InterruptedException, DataClientException, DataServiceException { + String query = String.format("%s | count", table); + + Results res = engineClient.execute(database, query); + Integer timeoutMs = 60 * 6 * 1000; + Integer rowCount = 0; + Integer timeElapsedMs = 0; + Integer sleepPeriodMs = 5 * 1000; + + while (rowCount < expectedNumberOfRows && timeElapsedMs < timeoutMs) { + Thread.sleep(sleepPeriodMs); + res = engineClient.execute(database, query); + rowCount = Integer.valueOf(res.getValues().get(0).get(0)); + timeElapsedMs += sleepPeriodMs; + } + Assertions.assertEquals(res.getValues().get(0).get(0), expectedNumberOfRows.toString()); + } +} diff --git a/src/test/resources/data.avro b/src/test/resources/data.avro new file mode 100644 index 0000000000000000000000000000000000000000..83d37bc1eb3657611bc6432dddca0147fce753da GIT binary patch literal 1184 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OC=kY}=fy9(_s+CGA3sRM=l!{W5 z^NUiHbd>TEb3uIX(&CbQz0eRnm(;Y9p#1Xi%#sX0pq65wlC;d!oRnfEtLSQ&>WGlk ziV|cc5L02cC>56!W#*+T#nu8Xgjx{cQVC>bCX=YkF)zO)Bee)*AJ8R)tpPgSuQWF) zwFs_SDKoD`3CSTq74D^(Ag>cPEy5+gG$|()u2v}p!bdVL!YwC15#ctaG%y!gfm42d zPHJKv!jL4W0J0*_yb?n*gc3A2M*yXbjqpmFnINQd@_~VmY>g)F`O5io0yYBbQFUVAKWu|(;OrzI2My&b|x@8z>)^e+!9<_T#}lr z7XnlqTmX(*%sdibl9`+h%0AE>2k~`mZEb9AEJI2b&!d#H_rC|uOZfMyxvzp{3NsS} z69WSSlZOEm=z?fCVW?|hWMFJyVqj`uW?&9fhyfZVU0Bhy^3?@FCXT4oqI@XHWB@dl zNv~R|D8Jk@MajxgM=7zW2$+`*bPQu_8JOVq0WH97U(u1M8U=