Add Azure Blob storage deploy unit tests with fixing comments
This commit is contained in:
Родитель
a5bac3196c
Коммит
2e3232717f
2
pom.xml
2
pom.xml
|
@ -229,7 +229,7 @@
|
|||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
<version>2.7.22</version>
|
||||
<version>2.28.2</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
|
|
|
@ -167,8 +167,7 @@ public final class HdiClusterDetail implements ClusterDetail, HdiCluster, Cluste
|
|||
}
|
||||
|
||||
final WasbUri defaultBlobUri = WasbUri.parse(defaultFS);
|
||||
final String storageKey = getCoreSiteConfig().get(AzureBlobStorageDeploy.getHadoopBlobFsPropertyKey(
|
||||
defaultBlobUri.getStorageAccount(), defaultBlobUri.getEndpointSuffix()));
|
||||
final String storageKey = getCoreSiteConfig().get(defaultBlobUri.getHadoopBlobFsPropertyKey());
|
||||
|
||||
return new AzureBlobStorageDeploy(storageKey, defaultBlobUri);
|
||||
}
|
||||
|
|
|
@ -9,14 +9,16 @@ import com.microsoft.azure.storage.CloudStorageAccount;
|
|||
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobContainerPublicAccessType;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import rx.Observable;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
public class AzureBlobStorageDeploy implements Deployable, Logger {
|
||||
|
@ -46,46 +48,48 @@ public class AzureBlobStorageDeploy implements Deployable, Logger {
|
|||
}
|
||||
}
|
||||
|
||||
public CloudStorageAccount getStorageAccount() {
|
||||
return storageAccount;
|
||||
}
|
||||
|
||||
public WasbUri getFsRoot() {
|
||||
return fsRoot;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a local file to Azure Blob storage.
|
||||
*
|
||||
* @param fileToUpload the local file to upload
|
||||
* @param storageAccount the Azure storage account
|
||||
* @param containerName the container name to upload
|
||||
* @param blobName the blob name to upload
|
||||
* @return the WASB URI for the file uploaded
|
||||
* @throws URISyntaxException the wrong blob or container name caused URI syntax exception
|
||||
* @throws StorageException the Azure storage exception when operating blob containers
|
||||
* @throws IOException the networking or local storage exceptions
|
||||
*/
|
||||
public static String uploadFileAsBlob(final File fileToUpload,
|
||||
final CloudStorageAccount storageAccount,
|
||||
final String containerName,
|
||||
final String blobName)
|
||||
private String uploadFileAsBlob(final File fileToUpload, final String blobName)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
final CloudBlobContainer blobContainer = getBlobContainer(storageAccount, containerName);
|
||||
final CloudBlobContainer blobContainer = getBlobContainer();
|
||||
blobContainer.createIfNotExists(BlobContainerPublicAccessType.BLOB, null, null);
|
||||
|
||||
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||
blob.upload(new FileInputStream(fileToUpload), fileToUpload.length());
|
||||
final CloudBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||
blob.upload(createFileInputStream(fileToUpload), fileToUpload.length());
|
||||
return WasbUri.parse(blob.getUri().toString()).getUri().toString();
|
||||
}
|
||||
|
||||
public static CloudBlobContainer getBlobContainer(final CloudStorageAccount storageAccount,
|
||||
final String containerName)
|
||||
throws URISyntaxException, StorageException {
|
||||
final CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
|
||||
return blobClient.getContainerReference(containerName);
|
||||
private CloudBlobContainer getBlobContainer() throws URISyntaxException, StorageException {
|
||||
final CloudBlobClient blobClient = getStorageAccount().createCloudBlobClient();
|
||||
return blobClient.getContainerReference(getFsRoot().getContainer());
|
||||
}
|
||||
|
||||
public static String getHadoopBlobFsPropertyKey(final String storageAccountName, final String endpointSuffix) {
|
||||
return String.format("fs.azure.account.key.%s.blob.%s", storageAccountName, endpointSuffix);
|
||||
protected InputStream createFileInputStream(final File file) throws FileNotFoundException {
|
||||
return new FileInputStream(file);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Observable<String> deploy(final File src) {
|
||||
final String destRelatedBlobName = getRelatedDestParentPath() + src.getName();
|
||||
return Observable.fromCallable(() -> fsRoot.getUri().resolve(
|
||||
uploadFileAsBlob(src, storageAccount, fsRoot.getContainer(), destRelatedBlobName)).toString());
|
||||
final String destRelatedBlobName = getDestRelativePath() + src.getName();
|
||||
|
||||
return Observable.fromCallable(() ->
|
||||
getFsRoot().getUri().resolve(uploadFileAsBlob(src, destRelatedBlobName)).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,13 +21,13 @@ public interface Deployable {
|
|||
Observable<String> deploy(final File src);
|
||||
|
||||
/**
|
||||
* Get a related path to destination parent folder path based for uploading artifacts.
|
||||
* Get a relative path to destination parent folder path based for uploading artifacts.
|
||||
* such as: `SparkSubmission/2019/01/20/_random_uuid_/`
|
||||
*
|
||||
* @return a related path for artifacts uploading. The default method return a path
|
||||
* with destination folder, date and random UUID
|
||||
*/
|
||||
default String getRelatedDestParentPath() {
|
||||
default String getDestRelativePath() {
|
||||
int year = Calendar.getInstance(TimeZone.getTimeZone("UTC")).get(Calendar.YEAR);
|
||||
int month = Calendar.getInstance(TimeZone.getTimeZone("UTC")).get(Calendar.MONTH) + 1;
|
||||
int day = Calendar.getInstance(TimeZone.getTimeZone("UTC")).get(Calendar.DAY_OF_MONTH);
|
||||
|
|
|
@ -235,8 +235,7 @@ public class LivySparkBatch implements SparkBatchJob, Logger {
|
|||
appIdWithLog.getRight().getLog().size() == 0
|
||||
&& ((!StringUtils.equalsIgnoreCase(getState(), "starting")
|
||||
&& appIdWithLog.getLeft() != null)
|
||||
|| (StringUtils.equalsIgnoreCase(getState(), "dead")
|
||||
&& appIdWithLog.getLeft() == null));
|
||||
|| StringUtils.equalsIgnoreCase(getState(), "dead"));
|
||||
|
||||
getSparkJobApplicationId()
|
||||
.flatMap(applicationId -> getSparkBatchLogRequest(start.get(), maxLinesPerGet), Pair::of)
|
||||
|
|
|
@ -3,19 +3,19 @@
|
|||
|
||||
package com.microsoft.azure.spark.tools.job;
|
||||
|
||||
import com.microsoft.azure.spark.tools.clusters.LivyCluster;
|
||||
import com.microsoft.azure.spark.tools.events.MessageInfoType;
|
||||
import com.microsoft.azure.spark.tools.http.HttpObservable;
|
||||
import com.microsoft.azure.spark.tools.log.Logger;
|
||||
import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches;
|
||||
import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches.Options;
|
||||
import com.microsoft.azure.spark.tools.utils.Pair;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import rx.Observer;
|
||||
import rx.subjects.PublishSubject;
|
||||
|
||||
public class LivySparkBatchFactory implements SparkBatchJobFactory, Logger {
|
||||
import com.microsoft.azure.spark.tools.clusters.LivyCluster;
|
||||
import com.microsoft.azure.spark.tools.events.MessageInfoType;
|
||||
import com.microsoft.azure.spark.tools.http.HttpObservable;
|
||||
import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches;
|
||||
import com.microsoft.azure.spark.tools.restapi.livy.batches.api.PostBatches.Options;
|
||||
import com.microsoft.azure.spark.tools.utils.Pair;
|
||||
|
||||
public class LivySparkBatchFactory implements SparkBatchJobFactory {
|
||||
private final LivyCluster cluster;
|
||||
private final PostBatches submissionParameter;
|
||||
private @Nullable HttpObservable http;
|
||||
|
|
|
@ -66,6 +66,10 @@ public final class WasbUri {
|
|||
return this.absolutePath.get();
|
||||
}
|
||||
|
||||
public String getHadoopBlobFsPropertyKey() {
|
||||
return String.format("fs.azure.account.key.%s.blob.%s", getStorageAccount(), getEndpointSuffix());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return rawUri.toString();
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
package com.microsoft.azure.spark.tools.job;
|
||||
|
||||
import cucumber.api.java.Before;
|
||||
import cucumber.api.java.en.And;
|
||||
import cucumber.api.java.en.Given;
|
||||
import cucumber.api.java.en.Then;
|
||||
import org.mockito.Mockito;
|
||||
import rx.Observable;
|
||||
|
||||
import com.microsoft.azure.spark.tools.utils.WasbUri;
|
||||
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
|
||||
import com.microsoft.azure.storage.blob.BlobContainerPublicAccessType;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
import com.microsoft.azure.storage.blob.CloudBlockBlob;
|
||||
import com.microsoft.azure.storage.core.Base64;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.CALLS_REAL_METHODS;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.withSettings;
|
||||
|
||||
public class AzureBlobStorageDeployScenario {
|
||||
private AzureBlobStorageDeploy deploy;
|
||||
private Observable<String> deploySub;
|
||||
private CloudStorageAccount mockStorageAccount;
|
||||
private CloudBlobClient mockBlobClient;
|
||||
private CloudBlobContainer mockBlobContainer;
|
||||
private CloudBlockBlob mockBlob;
|
||||
|
||||
static class DummyFileInputStream extends InputStream {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static class DummyDeploy implements Deployable {
|
||||
@Override
|
||||
public Observable<String> deploy(File src) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void Setup() {
|
||||
|
||||
}
|
||||
|
||||
@Given("create an AzureBlobStorageDeploy with access key {string} and fs root {string}")
|
||||
public void createAzureBlobStorageDeploy(String key, String fsRoot) {
|
||||
this.deploy = new AzureBlobStorageDeploy(Base64.encode(key.getBytes()), WasbUri.parse(fsRoot));
|
||||
}
|
||||
|
||||
@Then("check the AzureBlobStorageDeploy storage account credential is {string} and account is {string}")
|
||||
public void checkTheAzureBlobStorageDeployStorageAccountCredential(String expectKey, String expectAccount) {
|
||||
StorageCredentialsAccountAndKey actual = (StorageCredentialsAccountAndKey)
|
||||
this.deploy.getStorageAccount().getCredentials();
|
||||
|
||||
assertEquals(expectKey, new String(actual.exportKey()));
|
||||
assertEquals(expectAccount, actual.getAccountName());
|
||||
}
|
||||
|
||||
@Given("create an AzureBlobStorageDeploy with mocked storage account and fs root {string}")
|
||||
public void createAnAzureBlobStorageDeployWithMockedStorageAccount(String fsRoot) throws Throwable {
|
||||
// Mock storage account
|
||||
this.mockStorageAccount = Mockito.mock(CloudStorageAccount.class);
|
||||
this.mockBlobClient = Mockito.mock(CloudBlobClient.class);
|
||||
this.mockBlobContainer = Mockito.mock(CloudBlobContainer.class);
|
||||
this.mockBlob = Mockito.mock(CloudBlockBlob.class);
|
||||
|
||||
WasbUri fsUri = WasbUri.parse(fsRoot);
|
||||
this.deploy = Mockito.mock(
|
||||
AzureBlobStorageDeploy.class,
|
||||
withSettings()
|
||||
.useConstructor(this.mockStorageAccount, fsUri)
|
||||
.defaultAnswer(CALLS_REAL_METHODS));
|
||||
|
||||
doReturn(mockBlobClient).when(mockStorageAccount).createCloudBlobClient();
|
||||
doReturn(mockBlobContainer).when(mockBlobClient).getContainerReference(fsUri.getContainer());
|
||||
doNothing().when(mockBlobContainer).create(BlobContainerPublicAccessType.BLOB, null, null);
|
||||
}
|
||||
|
||||
@And("perform an Azure blob deploy operation for the local file {string}")
|
||||
public void performAzureBlobDeploy(String mockFilePath) throws Throwable {
|
||||
File mockFile = new File(mockFilePath);
|
||||
|
||||
doReturn(this.mockBlob).when(this.mockBlobContainer)
|
||||
.getBlockBlobReference(argThat(someString -> someString.endsWith(mockFile.getName())));
|
||||
|
||||
doReturn(this.deploy.getFsRoot().getUri().resolve(
|
||||
new DummyDeploy().getDestRelativePath() + mockFile.getName())).when(this.mockBlob)
|
||||
.getUri();
|
||||
|
||||
doReturn(new DummyFileInputStream()).when(this.deploy).createFileInputStream(mockFile);
|
||||
|
||||
this.deploySub = this.deploy.deploy(mockFile);
|
||||
}
|
||||
|
||||
@Then("check the destination URI should match regex {string}")
|
||||
public void checkTheDestinationURIRegexMatch(String expectRegex) {
|
||||
String actualDestUri = this.deploySub.toBlocking().single();
|
||||
|
||||
assertTrue("The actual deploy destination URI " + actualDestUri + " doesn't match " + expectRegex,
|
||||
Pattern.compile(expectRegex).matcher(actualDestUri).matches());
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
// Copyright (c) Microsoft Corporation.
|
||||
// Licensed under the MIT license.
|
||||
|
||||
package com.microsoft.azure.spark.tools.job;
|
||||
|
||||
import cucumber.api.CucumberOptions;
|
||||
import cucumber.api.junit.Cucumber;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
@RunWith(Cucumber.class)
|
||||
@CucumberOptions(
|
||||
plugin = {"html:target/cucumber"},
|
||||
features = { "src/test/resources/com/microsoft/azure/spark/tools/" +
|
||||
"job/AzureBlobStorageDeployScenario.feature"}
|
||||
)
|
||||
public class AzureBlobStorageDeployTest {
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
Feature: AzureBlobStorageDeploy unit tests
|
||||
|
||||
Scenario: Create an AzureBlobStorageDeploy with access key and fs root
|
||||
Given create an AzureBlobStorageDeploy with access key 'mockkey' and fs root 'wasbs://mycontainer@mockaccount.blob.core.windows.net/'
|
||||
Then check the AzureBlobStorageDeploy storage account credential is 'mockkey' and account is 'mockaccount'
|
||||
|
||||
Scenario: Create an AzureBlobStorageDeploy with mocked storage account key and fs root
|
||||
Given create an AzureBlobStorageDeploy with mocked storage account and fs root 'wasbs://mycontainer@mockaccount.blob.core.windows.net/'
|
||||
And perform an Azure blob deploy operation for the local file '/tmp/mock_artifact.jar'
|
||||
Then check the destination URI should match regex 'wasbs://mycontainer@mockaccount.blob.core.windows.net/SparkSubmission/\d{4}/\d{2}/\d{2}/[^/]+/mock_artifact.jar'
|
|
@ -0,0 +1 @@
|
|||
mock-maker-inline
|
Загрузка…
Ссылка в новой задаче