From 2e3232717fed1e1fd1f518b4f09b1a0eb5ed84f4 Mon Sep 17 00:00:00 2001 From: Wei Zhang Date: Tue, 6 Aug 2019 17:24:06 +0800 Subject: [PATCH] Add Azure Blob storage deploy unit tests with fixing comments --- pom.xml | 2 +- .../tools/clusters/HdiClusterDetail.java | 3 +- .../tools/job/AzureBlobStorageDeploy.java | 44 ++++--- .../azure/spark/tools/job/Deployable.java | 4 +- .../azure/spark/tools/job/LivySparkBatch.java | 3 +- .../tools/job/LivySparkBatchFactory.java | 16 +-- .../azure/spark/tools/utils/WasbUri.java | 4 + .../job/AzureBlobStorageDeployScenario.java | 121 ++++++++++++++++++ .../tools/job/AzureBlobStorageDeployTest.java | 17 +++ .../AzureBlobStorageDeployScenario.feature | 10 ++ .../org.mockito.plugins.MockMaker | 1 + 11 files changed, 190 insertions(+), 35 deletions(-) create mode 100644 src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.java create mode 100644 src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployTest.java create mode 100644 src/test/resources/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.feature create mode 100644 src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/pom.xml b/pom.xml index 787cdb6..4be8b4e 100644 --- a/pom.xml +++ b/pom.xml @@ -229,7 +229,7 @@ org.mockito mockito-core - 2.7.22 + 2.28.2 test diff --git a/src/main/java/com/microsoft/azure/spark/tools/clusters/HdiClusterDetail.java b/src/main/java/com/microsoft/azure/spark/tools/clusters/HdiClusterDetail.java index bb73a23..6aacbe4 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/clusters/HdiClusterDetail.java +++ b/src/main/java/com/microsoft/azure/spark/tools/clusters/HdiClusterDetail.java @@ -167,8 +167,7 @@ public final class HdiClusterDetail implements ClusterDetail, HdiCluster, Cluste } final WasbUri defaultBlobUri = WasbUri.parse(defaultFS); - final String storageKey = getCoreSiteConfig().get(AzureBlobStorageDeploy.getHadoopBlobFsPropertyKey( - defaultBlobUri.getStorageAccount(), defaultBlobUri.getEndpointSuffix())); + final String storageKey = getCoreSiteConfig().get(defaultBlobUri.getHadoopBlobFsPropertyKey()); return new AzureBlobStorageDeploy(storageKey, defaultBlobUri); } diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeploy.java b/src/main/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeploy.java index 8eed40c..67fb84a 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeploy.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeploy.java @@ -9,14 +9,16 @@ import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobContainerPublicAccessType; +import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.CloudBlockBlob; import rx.Observable; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.net.URISyntaxException; public class AzureBlobStorageDeploy implements Deployable, Logger { @@ -46,46 +48,48 @@ public class AzureBlobStorageDeploy implements Deployable, Logger { } } + public CloudStorageAccount getStorageAccount() { + return storageAccount; + } + + public WasbUri getFsRoot() { + return fsRoot; + } + /** * Upload a local file to Azure Blob storage. * * @param fileToUpload the local file to upload - * @param storageAccount the Azure storage account - * @param containerName the container name to upload * @param blobName the blob name to upload * @return the WASB URI for the file uploaded * @throws URISyntaxException the wrong blob or container name caused URI syntax exception * @throws StorageException the Azure storage exception when operating blob containers * @throws IOException the networking or local storage exceptions */ - public static String uploadFileAsBlob(final File fileToUpload, - final CloudStorageAccount storageAccount, - final String containerName, - final String blobName) + private String uploadFileAsBlob(final File fileToUpload, final String blobName) throws URISyntaxException, StorageException, IOException { - final CloudBlobContainer blobContainer = getBlobContainer(storageAccount, containerName); + final CloudBlobContainer blobContainer = getBlobContainer(); blobContainer.createIfNotExists(BlobContainerPublicAccessType.BLOB, null, null); - final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName); - blob.upload(new FileInputStream(fileToUpload), fileToUpload.length()); + final CloudBlob blob = blobContainer.getBlockBlobReference(blobName); + blob.upload(createFileInputStream(fileToUpload), fileToUpload.length()); return WasbUri.parse(blob.getUri().toString()).getUri().toString(); } - public static CloudBlobContainer getBlobContainer(final CloudStorageAccount storageAccount, - final String containerName) - throws URISyntaxException, StorageException { - final CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); - return blobClient.getContainerReference(containerName); + private CloudBlobContainer getBlobContainer() throws URISyntaxException, StorageException { + final CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient(); + return blobClient.getContainerReference(getFsRoot().getContainer()); } - public static String getHadoopBlobFsPropertyKey(final String storageAccountName, final String endpointSuffix) { - return String.format("fs.azure.account.key.%s.blob.%s", storageAccountName, endpointSuffix); + protected InputStream createFileInputStream(final File file) throws FileNotFoundException { + return new FileInputStream(file); } @Override public Observable deploy(final File src) { - final String destRelatedBlobName = getRelatedDestParentPath() + src.getName(); - return Observable.fromCallable(() -> fsRoot.getUri().resolve( - uploadFileAsBlob(src, storageAccount, fsRoot.getContainer(), destRelatedBlobName)).toString()); + final String destRelatedBlobName = getDestRelativePath() + src.getName(); + + return Observable.fromCallable(() -> + getFsRoot().getUri().resolve(uploadFileAsBlob(src, destRelatedBlobName)).toString()); } } diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/Deployable.java b/src/main/java/com/microsoft/azure/spark/tools/job/Deployable.java index 5e621e5..432a079 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/Deployable.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/Deployable.java @@ -21,13 +21,13 @@ public interface Deployable { Observable deploy(final File src); /** - * Get a related path to destination parent folder path based for uploading artifacts. + * Get a relative path to destination parent folder path based for uploading artifacts. * such as: `SparkSubmission/2019/01/20/_random_uuid_/` * * @return a related path for artifacts uploading. The default method return a path * with destination folder, date and random UUID */ - default String getRelatedDestParentPath() { + default String getDestRelativePath() { int year = Calendar.getInstance(TimeZone.getTimeZone("UTC")).get(Calendar.YEAR); int month = Calendar.getInstance(TimeZone.getTimeZone("UTC")).get(Calendar.MONTH) + 1; int day = Calendar.getInstance(TimeZone.getTimeZone("UTC")).get(Calendar.DAY_OF_MONTH); diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatch.java b/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatch.java index 7a2ec9a..b0092fa 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatch.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatch.java @@ -235,8 +235,7 @@ public class LivySparkBatch implements SparkBatchJob, Logger { appIdWithLog.getRight().getLog().size() == 0 && ((!StringUtils.equalsIgnoreCase(getState(), "starting") && appIdWithLog.getLeft() != null) - || (StringUtils.equalsIgnoreCase(getState(), "dead") - && appIdWithLog.getLeft() == null)); + || StringUtils.equalsIgnoreCase(getState(), "dead")); getSparkJobApplicationId() .flatMap(applicationId -> getSparkBatchLogRequest(start.get(), maxLinesPerGet), Pair::of) diff --git a/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatchFactory.java b/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatchFactory.java index 8575470..4e38d7e 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatchFactory.java +++ b/src/main/java/com/microsoft/azure/spark/tools/job/LivySparkBatchFactory.java @@ -3,19 +3,19 @@ package com.microsoft.azure.spark.tools.job; -import com.microsoft.azure.spark.tools.clusters.LivyCluster; -import com.microsoft.azure.spark.tools.events.MessageInfoType; -import com.microsoft.azure.spark.tools.http.HttpObservable; -import com.microsoft.azure.spark.tools.log.Logger; -import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches; -import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches.Options; -import com.microsoft.azure.spark.tools.utils.Pair; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.LoggerFactory; import rx.Observer; import rx.subjects.PublishSubject; -public class LivySparkBatchFactory implements SparkBatchJobFactory, Logger { +import com.microsoft.azure.spark.tools.clusters.LivyCluster; +import com.microsoft.azure.spark.tools.events.MessageInfoType; +import com.microsoft.azure.spark.tools.http.HttpObservable; +import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches; +import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches.Options; +import com.microsoft.azure.spark.tools.utils.Pair; + +public class LivySparkBatchFactory implements SparkBatchJobFactory { private final LivyCluster cluster; private final PostBatches submissionParameter; private @Nullable HttpObservable http; diff --git a/src/main/java/com/microsoft/azure/spark/tools/utils/WasbUri.java b/src/main/java/com/microsoft/azure/spark/tools/utils/WasbUri.java index d012e1f..6b58a93 100644 --- a/src/main/java/com/microsoft/azure/spark/tools/utils/WasbUri.java +++ b/src/main/java/com/microsoft/azure/spark/tools/utils/WasbUri.java @@ -66,6 +66,10 @@ public final class WasbUri { return this.absolutePath.get(); } + public String getHadoopBlobFsPropertyKey() { + return String.format("fs.azure.account.key.%s.blob.%s", getStorageAccount(), getEndpointSuffix()); + } + @Override public String toString() { return rawUri.toString(); diff --git a/src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.java b/src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.java new file mode 100644 index 0000000..ce0b084 --- /dev/null +++ b/src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.java @@ -0,0 +1,121 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package com.microsoft.azure.spark.tools.job; + +import cucumber.api.java.Before; +import cucumber.api.java.en.And; +import cucumber.api.java.en.Given; +import cucumber.api.java.en.Then; +import org.mockito.Mockito; +import rx.Observable; + +import com.microsoft.azure.spark.tools.utils.WasbUri; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentialsAccountAndKey; +import com.microsoft.azure.storage.blob.BlobContainerPublicAccessType; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.core.Base64; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.withSettings; + +public class AzureBlobStorageDeployScenario { + private AzureBlobStorageDeploy deploy; + private Observable deploySub; + private CloudStorageAccount mockStorageAccount; + private CloudBlobClient mockBlobClient; + private CloudBlobContainer mockBlobContainer; + private CloudBlockBlob mockBlob; + + static class DummyFileInputStream extends InputStream { + @Override + public int read() throws IOException { + return 0; + } + } + + static class DummyDeploy implements Deployable { + @Override + public Observable deploy(File src) { + return Observable.empty(); + } + } + + @Before + public void Setup() { + + } + + @Given("create an AzureBlobStorageDeploy with access key {string} and fs root {string}") + public void createAzureBlobStorageDeploy(String key, String fsRoot) { + this.deploy = new AzureBlobStorageDeploy(Base64.encode(key.getBytes()), WasbUri.parse(fsRoot)); + } + + @Then("check the AzureBlobStorageDeploy storage account credential is {string} and account is {string}") + public void checkTheAzureBlobStorageDeployStorageAccountCredential(String expectKey, String expectAccount) { + StorageCredentialsAccountAndKey actual = (StorageCredentialsAccountAndKey) + this.deploy.getStorageAccount().getCredentials(); + + assertEquals(expectKey, new String(actual.exportKey())); + assertEquals(expectAccount, actual.getAccountName()); + } + + @Given("create an AzureBlobStorageDeploy with mocked storage account and fs root {string}") + public void createAnAzureBlobStorageDeployWithMockedStorageAccount(String fsRoot) throws Throwable { + // Mock storage account + this.mockStorageAccount = Mockito.mock(CloudStorageAccount.class); + this.mockBlobClient = Mockito.mock(CloudBlobClient.class); + this.mockBlobContainer = Mockito.mock(CloudBlobContainer.class); + this.mockBlob = Mockito.mock(CloudBlockBlob.class); + + WasbUri fsUri = WasbUri.parse(fsRoot); + this.deploy = Mockito.mock( + AzureBlobStorageDeploy.class, + withSettings() + .useConstructor(this.mockStorageAccount, fsUri) + .defaultAnswer(CALLS_REAL_METHODS)); + + doReturn(mockBlobClient).when(mockStorageAccount).createCloudBlobClient(); + doReturn(mockBlobContainer).when(mockBlobClient).getContainerReference(fsUri.getContainer()); + doNothing().when(mockBlobContainer).create(BlobContainerPublicAccessType.BLOB, null, null); + } + + @And("perform an Azure blob deploy operation for the local file {string}") + public void performAzureBlobDeploy(String mockFilePath) throws Throwable { + File mockFile = new File(mockFilePath); + + doReturn(this.mockBlob).when(this.mockBlobContainer) + .getBlockBlobReference(argThat(someString -> someString.endsWith(mockFile.getName()))); + + doReturn(this.deploy.getFsRoot().getUri().resolve( + new DummyDeploy().getDestRelativePath() + mockFile.getName())).when(this.mockBlob) + .getUri(); + + doReturn(new DummyFileInputStream()).when(this.deploy).createFileInputStream(mockFile); + + this.deploySub = this.deploy.deploy(mockFile); + } + + @Then("check the destination URI should match regex {string}") + public void checkTheDestinationURIRegexMatch(String expectRegex) { + String actualDestUri = this.deploySub.toBlocking().single(); + + assertTrue("The actual deploy destination URI " + actualDestUri + " doesn't match " + expectRegex, + Pattern.compile(expectRegex).matcher(actualDestUri).matches()); + + } +} diff --git a/src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployTest.java b/src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployTest.java new file mode 100644 index 0000000..fff0c5a --- /dev/null +++ b/src/test/java/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployTest.java @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +package com.microsoft.azure.spark.tools.job; + +import cucumber.api.CucumberOptions; +import cucumber.api.junit.Cucumber; +import org.junit.runner.RunWith; + +@RunWith(Cucumber.class) +@CucumberOptions( + plugin = {"html:target/cucumber"}, + features = { "src/test/resources/com/microsoft/azure/spark/tools/" + + "job/AzureBlobStorageDeployScenario.feature"} +) +public class AzureBlobStorageDeployTest { +} diff --git a/src/test/resources/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.feature b/src/test/resources/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.feature new file mode 100644 index 0000000..27f379c --- /dev/null +++ b/src/test/resources/com/microsoft/azure/spark/tools/job/AzureBlobStorageDeployScenario.feature @@ -0,0 +1,10 @@ +Feature: AzureBlobStorageDeploy unit tests + + Scenario: Create an AzureBlobStorageDeploy with access key and fs root + Given create an AzureBlobStorageDeploy with access key 'mockkey' and fs root 'wasbs://mycontainer@mockaccount.blob.core.windows.net/' + Then check the AzureBlobStorageDeploy storage account credential is 'mockkey' and account is 'mockaccount' + + Scenario: Create an AzureBlobStorageDeploy with mocked storage account key and fs root + Given create an AzureBlobStorageDeploy with mocked storage account and fs root 'wasbs://mycontainer@mockaccount.blob.core.windows.net/' + And perform an Azure blob deploy operation for the local file '/tmp/mock_artifact.jar' + Then check the destination URI should match regex 'wasbs://mycontainer@mockaccount.blob.core.windows.net/SparkSubmission/\d{4}/\d{2}/\d{2}/[^/]+/mock_artifact.jar' \ No newline at end of file diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000..ca6ee9c --- /dev/null +++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file