Merge stream changes with this branch

This commit is contained in:
rabee333 2018-09-06 10:01:27 +03:00
Родитель 0993768246
Коммит 83890ed24c
3 изменённых файлов: 66 добавлений и 19 удалений

Просмотреть файл

@ -6,7 +6,10 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
@ -62,6 +65,29 @@ class AzureStorageHelperTest {
} }
@Test @Test
void getBlobPathWithSas() { void uploadFromStreamToBlob() {
try{
OutputStream outputStreamMock = mock(OutputStream.class);
doNothing().when(outputStreamMock).write(any(byte[].class),anyInt(),anyInt());
String testFilePath = Paths.get("src","test","resources","testdata.json").toString();
InputStream stream = new FileInputStream(testFilePath);
azureStorageHelperMock.uploadStreamToBlob(stream,"blobName","https://ms.com/blob",false);
verify(azureStorageHelperMock).uploadStreamToBlob(any(),anyString(),anyString(),anyBoolean());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
void uploadFromStreamToBlobCompress() {
try{
OutputStream outputStreamMock = mock(OutputStream.class);
doNothing().when(outputStreamMock).write(any(byte[].class),anyInt(),anyInt());
String testFilePath = Paths.get("src","test","resources","testdata.json").toString();
InputStream stream = new FileInputStream(testFilePath);
azureStorageHelperMock.uploadStreamToBlob(stream,"blobName","https://ms.com/blob",true);
verify(azureStorageHelperMock).uploadStreamToBlob(any(),anyString(),anyString(),anyBoolean());
} catch (Exception e) {
e.printStackTrace();
}
} }
} }

Просмотреть файл

@ -2,11 +2,14 @@ package com.microsoft.azure.kusto.ingest;
import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo; import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.storage.blob.CloudBlockBlob; import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -17,14 +20,14 @@ class KustoIngestClientImplTest {
ResourceManager resourceManagerMock = mock(ResourceManager.class); ResourceManager resourceManagerMock = mock(ResourceManager.class);
KustoIngestClient ingestClientMock; KustoIngestClient ingestClientMock;
KustoIngestClientImpl batchIngestClientMock; KustoIngestClientImpl ingestClientMockImpl;
KustoIngestionProperties props; KustoIngestionProperties props;
@BeforeEach @BeforeEach
void setUp() { void setUp() {
try { try {
ingestClientMock = mock(KustoIngestClient.class); ingestClientMock = mock(KustoIngestClient.class);
batchIngestClientMock = mock(KustoIngestClientImpl.class); ingestClientMockImpl = mock(KustoIngestClientImpl.class);
when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE)) when(resourceManagerMock.getIngestionResource(ResourceManager.ResourceTypes.SECURED_READY_FOR_AGGREGATION_QUEUE))
.thenReturn("queue1") .thenReturn("queue1")
@ -75,23 +78,41 @@ class KustoIngestClientImplTest {
@Test @Test
void ingestFromFile() { void ingestFromFile() {
try { try {
String testFilePath = Paths.get("src","test","resources","testdata.json").toString(); String testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
when(ingestClientMockImpl.uploadLocalFileToBlob(isA(String.class), isA(String.class), isA(String.class)))
when(batchIngestClientMock.uploadLocalFileToBlob(isA(String.class),isA(String.class),isA(String.class)))
.thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri"))); .thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri")));
doNothing().when(batchIngestClientMock).postMessageToQueue(isA(String.class),isA(String.class)); doNothing().when(ingestClientMockImpl).postMessageToQueue(isA(String.class), isA(String.class));
FileSourceInfo fileSourceInfo = new FileSourceInfo(testFilePath,0); FileSourceInfo fileSourceInfo = new FileSourceInfo(testFilePath, 0);
int numOfFiles = 3; int numOfFiles = 3;
for(int i=0; i<numOfFiles; i++){ for (int i = 0; i < numOfFiles; i++) {
ingestClientMock.ingestFromFile(fileSourceInfo,props); ingestClientMock.ingestFromFile(fileSourceInfo, props);
} }
verify(ingestClientMock, times(numOfFiles)).ingestFromFile(fileSourceInfo,props); verify(ingestClientMock, times(numOfFiles)).ingestFromFile(fileSourceInfo, props);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
}
@Test
void ingestFromStream() {
try {
String testFilePath = Paths.get("src", "test", "resources", "testdata.json").toString();
when(ingestClientMockImpl.uploadStreamToBlob(isA(InputStream.class), isA(String.class), isA(String.class), isA(Boolean.class)))
.thenReturn(new CloudBlockBlob(new URI("https://ms.com/storageUri")));
doNothing().when(ingestClientMockImpl).postMessageToQueue(isA(String.class), isA(String.class));
int numOfFiles = 3;
for (int i = 0; i < numOfFiles; i++) {
InputStream stream = new FileInputStream(testFilePath);
StreamSourceInfo streamSourceInfo = new StreamSourceInfo(stream,false);
ingestClientMock.ingestFromStream(streamSourceInfo, props);
}
verify(ingestClientMock, times(numOfFiles)).ingestFromStream(any(StreamSourceInfo.class), any(KustoIngestionProperties.class));
} catch (Exception e) {
e.printStackTrace();
}
}
}

Просмотреть файл

@ -6,15 +6,15 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
public class FileIngestion { public class FileIngestion {
private static final String appId = "2cb0cc4c-c9be-4301-b2aa-718935b0ce1d"; private static final String appId = "";
private static final String appKey = "11hZc+sXY7cwFQ91DMmjEvPFfBHxN8P25kV+BH8A9qg="; //2cb0cc4c-c9be-4301-b2aa-718935b0ce1d private static final String appKey = "";
public static void main(String[] args) { public static void main(String[] args) {
try { try {
String kustoClusterPath = "https://ingest-csetests.westeurope.kusto.windows.net"; String kustoClusterPath = "https://ingest-<cluster-name>.kusto.windows.net";
String dbName = "raabusal"; String dbName = "";
String tableName = "test1"; String tableName = "";
String dataMappingName = "map1"; String dataMappingName = "";
String dataFormat = "json"; String dataFormat = "json";
KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadApplicationCredentials(kustoClusterPath,appId,appKey); KustoConnectionStringBuilder kcsb = KustoConnectionStringBuilder.createWithAadApplicationCredentials(kustoClusterPath,appId,appKey);