This commit is contained in:
Ohad Bitton 2022-08-21 16:17:04 +03:00
Родитель a8dfb0eb47
Коммит 9765d0c501
8 изменённых файлов: 8 добавлений и 30 удалений

Просмотреть файл

@ -216,11 +216,6 @@
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor-core.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>

Просмотреть файл

@ -47,7 +47,7 @@ public class AzureStorageClient {
tableClient.createEntity(tableEntity);
}
BlobClient uploadLocalFileToBlob(File file, String blobName, BlobContainerClient container, boolean shouldCompress)
void uploadLocalFileToBlob(File file, String blobName, BlobContainerClient container, boolean shouldCompress)
throws IOException {
log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", file.getPath(), blobName, container.getBlobContainerUrl());
@ -62,8 +62,6 @@ public class AzureStorageClient {
} else {
uploadFileToBlob(file, blobClient);
}
return blobClient;
}
void uploadFileToBlob(File sourceFile, BlobClient blobClient) throws IOException {
@ -84,7 +82,7 @@ public class AzureStorageClient {
}
}
BlobClient uploadStreamToBlob(InputStream inputStream, String blobName, BlobContainerClient container, boolean shouldCompress)
void uploadStreamToBlob(InputStream inputStream, String blobName, BlobContainerClient container, boolean shouldCompress)
throws IOException, URISyntaxException {
log.debug("uploadStreamToBlob: blobName: {}, storageUri: {}", blobName, container);
@ -99,8 +97,6 @@ public class AzureStorageClient {
} else {
uploadStream(inputStream, blobClient);
}
return blobClient;
}
void uploadStream(InputStream inputStream, BlobClient blob) throws IOException {

Просмотреть файл

@ -169,10 +169,9 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
shouldCompress ? CompressionType.gz : sourceCompressionType);
ContainerWithSas container = resourceManager.getTempStorage();
String blobPath;
BlobClient blobClient = azureStorageClient.uploadLocalFileToBlob(file, blobName,
azureStorageClient.uploadLocalFileToBlob(file, blobName,
container.getContainer(), shouldCompress);
blobPath = azureStorageClient.getBlobPathWithSas(blobClient.getBlobUrl(),container.getSas());
String blobPath = container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas();
long rawDataSize = fileSourceInfo.getRawSizeInBytes() > 0L ? fileSourceInfo.getRawSizeInBytes() :
estimateFileRawSize(filePath, ingestionProperties.getDataFormat().isCompressible());
@ -223,7 +222,7 @@ public class QueuedIngestClient extends IngestClientBase implements IngestClient
container.getContainer(),
shouldCompress
);
String blobPath = azureStorageClient.getBlobPathWithSas(container.getSas(), blobName);
String blobPath = container.getContainer().getBlobContainerUrl() + "/" + blobName + container.getSas();
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(
blobPath, 0); // TODO: check if we can get the rawDataSize locally - maybe add a countingStream

Просмотреть файл

@ -237,7 +237,6 @@ class ResourceManager implements Closeable {
if (ingestionResourcesLock.writeLock().tryLock()) {
try {
log.info("Refreshing Ingestion Resources");
KustoOperationResult ingestionResourcesResults = client.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND);
this.containers = new IngestionResource<>(ResourceType.TEMP_STORAGE);
this.queues = new IngestionResource<>(ResourceType.SECURED_READY_FOR_AGGREGATION_QUEUE);
this.successfulIngestionsQueues = new IngestionResource<>(ResourceType.SUCCESSFUL_INGESTIONS_QUEUE);
@ -247,7 +246,6 @@ class ResourceManager implements Closeable {
CheckedFunction0<KustoOperationResult> retryExecute = Retry.decorateCheckedSupplier(retry,
() -> client.execute(Commands.INGESTION_RESOURCES_SHOW_COMMAND));
KustoOperationResult ingestionResourcesResults = retryExecute.apply();
ingestionResources = Collections.synchronizedMap(new EnumMap<>(ResourceType.class));
if (ingestionResourcesResults != null && ingestionResourcesResults.hasNext()) {
KustoResultSetTable table = ingestionResourcesResults.next();
// Add the received values to a new IngestionResources map

Просмотреть файл

@ -87,9 +87,6 @@ class ManagedStreamingIngestClientTest {
when(resourceManagerMock.getIdentityToken()).thenReturn("identityToken");
when(azureStorageClientMock.uploadStreamToBlob(any(InputStream.class), anyString(), any(), anyBoolean()))
.thenReturn(new BlobClientBuilder().endpoint(STORAGE_URL).blobName("blobName").buildClient());
when(azureStorageClientMock.getBlobPathWithSas(anyString(), anyString())).thenReturn(STORAGE_URL);
doNothing().when(azureStorageClientMock).azureTableInsertEntity(any(), any(TableEntity.class));

Просмотреть файл

@ -68,14 +68,8 @@ class QueuedIngestClientTest {
when(resourceManagerMock.getIdentityToken()).thenReturn("identityToken");
when(azureStorageClientMock.uploadStreamToBlob(any(InputStream.class), anyString(), any(), anyBoolean()))
.thenReturn(new BlobClientBuilder().endpoint(STORAGE_URL).blobName("blobName").buildClient());
when(azureStorageClientMock.getBlobPathWithSas(anyString(), anyString())).thenReturn(STORAGE_URL);
when(azureStorageClientMock.uploadLocalFileToBlob(any(File.class), anyString(), any(), anyBoolean()))
.thenReturn(new BlobClientBuilder().endpoint(STORAGE_URL).blobName("blobName").buildClient());
doNothing().when(azureStorageClientMock).azureTableInsertEntity(any(), any(TableEntity.class));
doNothing().when(azureStorageClientMock).postMessageToQueue(any(), anyString());

Просмотреть файл

@ -612,7 +612,7 @@ class StreamingIngestClientTest {
IngestionClientException ingestionClientException = assertThrows(IngestionClientException.class,
() -> streamingIngestClient.ingestFromBlob(blobSourceInfo2, ingestionProperties),
"Expected IngestionClientException to be thrown, but it didn't");
assertTrue(ingestionClientException.getMessage().contains("Unexpected Storage error when ingesting a blob."));
assertTrue(ingestionClientException.getMessage().contains("Exception trying to read blob metadata"));
}
@Test
@ -633,7 +633,7 @@ class StreamingIngestClientTest {
}
@Test
void IngestFromResultSet() throws Exception {
void IngestFbromResultSet() throws Exception {
ResultSetMetaData resultSetMetaData = mock(ResultSetMetaData.class);
ResultSet resultSet = mock(ResultSet.class);

Просмотреть файл

@ -47,11 +47,10 @@
<httpcore.version>4.4.15</httpcore.version>
<json.version>20201115</json.version>
<fasterxml.jackson.core.version>2.13.1</fasterxml.jackson.core.version>
<reactor-core.version>3.4.10</reactor-core.version>
<univocity-parsers.version>2.9.1</univocity-parsers.version>
<!-- Bom 1.0.6 targets azure-core with same fasterxml version as databricks runtime 10
https://docs.microsoft.com/en-us/azure/databricks/release-notes/runtime/10.3-->
<azure-bom-version>1.2.0</azure-bom-version>
<azure-bom-version>1.2.4</azure-bom-version>
<resilience4j.version>1.7.1</resilience4j.version>
<io.vavr.version>0.10.2</io.vavr.version>
<!-- Test dependencies -->