Merge pull request #11 from xingwu1/master
Workaround the autorest bugs and Fix auth bug
This commit is contained in:
Коммит
68d6c56d85
6
pom.xml
6
pom.xml
|
@ -70,6 +70,12 @@
|
|||
<version>4.0.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<version>2.4</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<build>
|
||||
<plugins>
|
||||
|
|
|
@ -20,9 +20,10 @@ import com.microsoft.azure.batch.protocol.models.FileListFromTaskOptions;
|
|||
import com.microsoft.azure.batch.protocol.models.FileProperties;
|
||||
import com.microsoft.azure.batch.protocol.models.NodeFile;
|
||||
import com.microsoft.rest.ServiceResponseWithHeaders;
|
||||
import rx.exceptions.Exceptions;
|
||||
import rx.functions.Func1;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.*;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -284,7 +285,27 @@ public class FileOperations implements IInheritedBehaviors {
|
|||
BehaviorManager bhMgr = new BehaviorManager(this.customBehaviors(), additionalBehaviors);
|
||||
bhMgr.applyRequestBehaviors(options);
|
||||
|
||||
return this._parentBatchClient.protocolLayer().files().getFromTask(jobId, taskId, fileName, options);
|
||||
// Duplicate the stream due to AutoRest issue https://github.com/Azure/autorest/issues/1385
|
||||
ByteArrayOutputStream output = this._parentBatchClient.protocolLayer().files().getFromTaskAsync(jobId, taskId, fileName, options)
|
||||
.map(new Func1<InputStream, ByteArrayOutputStream>() {
|
||||
@Override
|
||||
public ByteArrayOutputStream call(InputStream input) {
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||
byte[] data = new byte[16384];
|
||||
int nRead;
|
||||
try {
|
||||
while ((nRead = input.read(data, 0, data.length)) != -1) {
|
||||
buffer.write(data, 0, nRead);
|
||||
}
|
||||
buffer.flush();
|
||||
return buffer;
|
||||
} catch (IOException e) {
|
||||
throw Exceptions.propagate(e);
|
||||
}
|
||||
}
|
||||
}).toBlocking().single();
|
||||
|
||||
return new ByteArrayInputStream(output.toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -317,7 +338,27 @@ public class FileOperations implements IInheritedBehaviors {
|
|||
BehaviorManager bhMgr = new BehaviorManager(this.customBehaviors(), additionalBehaviors);
|
||||
bhMgr.applyRequestBehaviors(options);
|
||||
|
||||
return this._parentBatchClient.protocolLayer().files().getFromComputeNode(poolId, nodeId, fileName, options);
|
||||
// Duplicate the stream due to AutoRest issue https://github.com/Azure/autorest/issues/1385
|
||||
ByteArrayOutputStream output = this._parentBatchClient.protocolLayer().files().getFromComputeNodeAsync(poolId, nodeId, fileName, options)
|
||||
.map(new Func1<InputStream, ByteArrayOutputStream>() {
|
||||
@Override
|
||||
public ByteArrayOutputStream call(InputStream input) {
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||
byte[] data = new byte[16384];
|
||||
int nRead;
|
||||
try {
|
||||
while ((nRead = input.read(data, 0, data.length)) != -1) {
|
||||
buffer.write(data, 0, nRead);
|
||||
}
|
||||
buffer.flush();
|
||||
return buffer;
|
||||
} catch (IOException e) {
|
||||
throw Exceptions.propagate(e);
|
||||
}
|
||||
}
|
||||
}).toBlocking().single();
|
||||
|
||||
return new ByteArrayInputStream(output.toByteArray());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,8 @@ import javax.crypto.Mac;
|
|||
import javax.crypto.spec.SecretKeySpec;
|
||||
import java.io.IOException;
|
||||
import java.net.URLDecoder;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
|
@ -31,6 +33,8 @@ class BatchSharedKeyCredentialsInterceptor implements Interceptor {
|
|||
|
||||
private final BatchSharedKeyCredentials credentials;
|
||||
|
||||
private Mac hmacSha256;
|
||||
|
||||
/**
|
||||
* Constructor for BatchSharedKeyCredentialsInterceptor
|
||||
*
|
||||
|
@ -62,20 +66,26 @@ class BatchSharedKeyCredentialsInterceptor implements Interceptor {
|
|||
return headerValue;
|
||||
}
|
||||
|
||||
private String sign(String accessKey, String stringToSign) {
|
||||
private synchronized String sign(String stringToSign) {
|
||||
try {
|
||||
// Encoding the Signature
|
||||
// Signature=Base64(HMAC-SHA256(UTF8(StringToSign)))
|
||||
Mac hmac = Mac.getInstance("hmacSHA256");
|
||||
hmac.init(new SecretKeySpec(Base64.decodeBase64(accessKey),
|
||||
"hmacSHA256"));
|
||||
byte[] digest = hmac.doFinal(stringToSign.getBytes("UTF-8"));
|
||||
byte[] digest = getHmac256().doFinal(stringToSign.getBytes("UTF-8"));
|
||||
return Base64.encodeBase64String(digest);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException("accessKey", e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized Mac getHmac256() throws NoSuchAlgorithmException, InvalidKeyException {
|
||||
if (this.hmacSha256 == null) {
|
||||
// Initializes the HMAC-SHA256 Mac and SecretKey.
|
||||
this.hmacSha256 = Mac.getInstance("HmacSHA256");
|
||||
this.hmacSha256.init(new SecretKeySpec(Base64.decodeBase64(this.credentials.keyValue()), "HmacSHA256"));
|
||||
}
|
||||
return this.hmacSha256;
|
||||
}
|
||||
|
||||
private Request signHeader(Request request) throws IOException {
|
||||
|
||||
Request.Builder builder = request.newBuilder();
|
||||
|
@ -141,7 +151,7 @@ class BatchSharedKeyCredentialsInterceptor implements Interceptor {
|
|||
|
||||
signature = signature + "/"
|
||||
+ credentials.accountName().toLowerCase() + "/"
|
||||
+ request.url().uri().getPath().replaceAll("^[/]+", "");
|
||||
+ request.url().uri().getRawPath().replaceAll("^[/]+", "");
|
||||
// We temporary change client side auth code generator to bypass server
|
||||
// bug 4092533
|
||||
signature = signature.replace("%5C", "/").replace("%2F", "/");
|
||||
|
@ -156,22 +166,18 @@ class BatchSharedKeyCredentialsInterceptor implements Interceptor {
|
|||
.toLowerCase(Locale.US);
|
||||
queryComponents.put(
|
||||
key,
|
||||
key
|
||||
+ ":"
|
||||
+ URLDecoder.decode(pair.substring(idx + 1),
|
||||
"UTF-8"));
|
||||
key + ":" + URLDecoder.decode(pair.substring(idx + 1),"UTF-8"));
|
||||
}
|
||||
|
||||
for (Map.Entry<String, String> entry : queryComponents.entrySet()) {
|
||||
signature = signature + "\n" + entry.getValue();
|
||||
}
|
||||
}
|
||||
String signedSignature = sign(credentials.keyValue(), signature);
|
||||
String signedSignature = sign(signature);
|
||||
String authorization = "SharedKey " + credentials.accountName()
|
||||
+ ":" + signedSignature;
|
||||
builder.header("Authorization", authorization);
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -43,13 +43,13 @@ public class FileGetPropertiesFromComputeNodeHeaders {
|
|||
* particular, you can pass the ETag to one of the If-Modified-Since,
|
||||
* If-Unmodified-Since, If-Match or If-None-Match headers.
|
||||
*/
|
||||
@JsonProperty(value = "ETag")
|
||||
@JsonProperty(value = "etag")
|
||||
private String eTag;
|
||||
|
||||
/**
|
||||
* The time at which the resource was last modified.
|
||||
*/
|
||||
@JsonProperty(value = "Last-Modified")
|
||||
@JsonProperty(value = "last-modified")
|
||||
private DateTimeRfc1123 lastModified;
|
||||
|
||||
/**
|
||||
|
@ -79,13 +79,13 @@ public class FileGetPropertiesFromComputeNodeHeaders {
|
|||
/**
|
||||
* The content type of the file.
|
||||
*/
|
||||
@JsonProperty(value = "Content-Type")
|
||||
@JsonProperty(value = "content-type")
|
||||
private String contentType;
|
||||
|
||||
/**
|
||||
* The length of the file.
|
||||
*/
|
||||
@JsonProperty(value = "Content-Length")
|
||||
@JsonProperty(value = "content-length")
|
||||
private Long contentLength;
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,13 +43,13 @@ public class FileGetPropertiesFromTaskHeaders {
|
|||
* particular, you can pass the ETag to one of the If-Modified-Since,
|
||||
* If-Unmodified-Since, If-Match or If-None-Match headers.
|
||||
*/
|
||||
@JsonProperty(value = "ETag")
|
||||
@JsonProperty(value = "etag")
|
||||
private String eTag;
|
||||
|
||||
/**
|
||||
* The time at which the resource was last modified.
|
||||
*/
|
||||
@JsonProperty(value = "Last-Modified")
|
||||
@JsonProperty(value = "last-modified")
|
||||
private DateTimeRfc1123 lastModified;
|
||||
|
||||
/**
|
||||
|
@ -79,13 +79,13 @@ public class FileGetPropertiesFromTaskHeaders {
|
|||
/**
|
||||
* The content type of the file.
|
||||
*/
|
||||
@JsonProperty(value = "Content-Type")
|
||||
@JsonProperty(value = "content-type")
|
||||
private String contentType;
|
||||
|
||||
/**
|
||||
* The length of the file.
|
||||
*/
|
||||
@JsonProperty(value = "Content-Length")
|
||||
@JsonProperty(value = "content-length")
|
||||
private Long contentLength;
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,6 +17,7 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.InvalidKeyException;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
|
@ -162,4 +163,44 @@ abstract class BatchTestBase {
|
|||
String sas = blob.generateSharedAccessSignature(policy, null);
|
||||
return blob.getUri() + "?" + sas;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait all tasks under a specified job to be completed
|
||||
* @param client batch client instance
|
||||
* @param jobId job id
|
||||
* @param expiryTime the waiting period
|
||||
* @return if task completed in time, return true, otherwise, return false
|
||||
* @throws BatchErrorException
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
static boolean waitForTasksToComplete(BatchClient client, String jobId, Duration expiryTime) throws BatchErrorException, IOException, InterruptedException {
|
||||
long startTime = System.currentTimeMillis();
|
||||
long elapsedTime = 0L;
|
||||
|
||||
while (elapsedTime < expiryTime.toMillis()) {
|
||||
List<CloudTask> taskCollection = client.taskOperations().listTasks(jobId, new DetailLevel.Builder().withSelectClause("id, state").build());
|
||||
|
||||
boolean allComplete = true;
|
||||
for (CloudTask task : taskCollection) {
|
||||
if (task.state() != TaskState.COMPLETED) {
|
||||
allComplete = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (allComplete) {
|
||||
// All tasks completed
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check again after 10 seconds
|
||||
Thread.sleep(10 * 1000);
|
||||
elapsedTime = (new Date()).getTime() - startTime;
|
||||
}
|
||||
|
||||
// Timeout, return false
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
* Licensed under the MIT License. See License.txt in the project root for
|
||||
* license information.
|
||||
*/
|
||||
|
||||
package com.microsoft.azure.batch;
|
||||
|
||||
import com.microsoft.azure.batch.protocol.models.*;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import rx.exceptions.Exceptions;
|
||||
import rx.functions.Func1;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
public class FileTests extends BatchTestBase {
|
||||
static CloudPool livePool;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
createClient(AuthMode.SharedKey);
|
||||
String poolId = getStringWithUserNamePrefix("-testpool");
|
||||
livePool = createIfNotExistPaaSPool(poolId);
|
||||
Assert.assertNotNull(livePool);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws Exception {
|
||||
try {
|
||||
//batchClient.poolOperations().deletePool(livePool.id());
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignore any clean up exception
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canReadFromTaskFile() throws Exception {
|
||||
// CREATE
|
||||
String jobId = getStringWithUserNamePrefix("-Job-" + (new Date()).toString().replace(' ', '-').replace(':', '-').replace('.', '-'));
|
||||
String taskId = "mytask";
|
||||
Duration TASK_COMPLETE_TIMEOUT = Duration.ofMinutes(1);
|
||||
|
||||
try {
|
||||
|
||||
PoolInformation poolInfo = new PoolInformation();
|
||||
poolInfo.withPoolId(livePool.id());
|
||||
batchClient.jobOperations().createJob(jobId, poolInfo);
|
||||
|
||||
TaskAddParameter taskToAdd = new TaskAddParameter();
|
||||
taskToAdd.withId(taskId)
|
||||
.withCommandLine("cmd /c echo hello");
|
||||
batchClient.taskOperations().createTask(jobId, taskToAdd);
|
||||
|
||||
if (waitForTasksToComplete(batchClient, jobId, TASK_COMPLETE_TIMEOUT)) {
|
||||
List<NodeFile> files = batchClient.fileOperations().listFilesFromTask(jobId, taskId);
|
||||
boolean found = false;
|
||||
for (NodeFile f : files) {
|
||||
if (f.name().equals("stdout.txt")) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertTrue(found);
|
||||
|
||||
InputStream stream = batchClient.fileOperations().getFileFromTask(jobId, taskId, "stdout.txt");
|
||||
String fileContent = IOUtils.toString(stream, "UTF-8");
|
||||
Assert.assertEquals(fileContent, "hello\r\n");
|
||||
stream.close();
|
||||
|
||||
String output = batchClient.protocolLayer().files().getFromTaskAsync(jobId, taskId, "stdout.txt").map(new Func1<InputStream, String>() {
|
||||
@Override
|
||||
public String call(InputStream input) {
|
||||
try {
|
||||
return IOUtils.toString(input, "UTF-8");
|
||||
} catch (IOException e) {
|
||||
throw Exceptions.propagate(e);
|
||||
}
|
||||
}
|
||||
}).toBlocking().single();
|
||||
Assert.assertEquals(fileContent, "hello\r\n");
|
||||
|
||||
FileProperties properties = batchClient.fileOperations().getFilePropertiesFromTask(jobId, taskId, "stdout.txt");
|
||||
Assert.assertEquals(7, properties.contentLength());
|
||||
} else {
|
||||
throw new TimeoutException("Task did not complete within the specified timeout");
|
||||
}
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
batchClient.jobOperations().deleteJob(jobId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Ignore here
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canReadFromNode() throws Exception {
|
||||
// CREATE
|
||||
String jobId = getStringWithUserNamePrefix("-Job-" + (new Date()).toString().replace(' ', '-').replace(':', '-').replace('.', '-'));
|
||||
String taskId = "mytask";
|
||||
Duration TASK_COMPLETE_TIMEOUT = Duration.ofMinutes(1);
|
||||
|
||||
try {
|
||||
PoolInformation poolInfo = new PoolInformation();
|
||||
poolInfo.withPoolId(livePool.id());
|
||||
batchClient.jobOperations().createJob(jobId, poolInfo);
|
||||
|
||||
TaskAddParameter taskToAdd = new TaskAddParameter();
|
||||
taskToAdd.withId(taskId)
|
||||
.withCommandLine("cmd /c echo hello");
|
||||
batchClient.taskOperations().createTask(jobId, taskToAdd);
|
||||
|
||||
if (waitForTasksToComplete(batchClient, jobId, TASK_COMPLETE_TIMEOUT)) {
|
||||
CloudTask task = batchClient.taskOperations().getTask(jobId, taskId);
|
||||
String nodeId = task.nodeInfo().nodeId();
|
||||
|
||||
List<NodeFile> files = batchClient.fileOperations().listFilesFromComputeNode(livePool.id(), nodeId, true, null);
|
||||
String fileName = null;
|
||||
for (NodeFile f : files) {
|
||||
if (f.name().endsWith("stdout.txt")) {
|
||||
fileName = f.name();
|
||||
break;
|
||||
}
|
||||
}
|
||||
Assert.assertNotNull(fileName);
|
||||
|
||||
|
||||
InputStream stream = batchClient.fileOperations().getFileFromComputeNode(livePool.id(), nodeId, fileName);
|
||||
String fileContent = IOUtils.toString(stream, "UTF-8");
|
||||
Assert.assertEquals(fileContent, "hello\r\n");
|
||||
stream.close();
|
||||
|
||||
String output = batchClient.protocolLayer().files().getFromComputeNodeAsync(livePool.id(), nodeId, fileName).map(new Func1<InputStream, String>() {
|
||||
@Override
|
||||
public String call(InputStream input) {
|
||||
try {
|
||||
return IOUtils.toString(input, "UTF-8");
|
||||
} catch (IOException e) {
|
||||
throw Exceptions.propagate(e);
|
||||
}
|
||||
}
|
||||
}).toBlocking().single();
|
||||
Assert.assertEquals(fileContent, "hello\r\n");
|
||||
|
||||
FileProperties properties = batchClient.fileOperations().getFilePropertiesFromComputeNode(livePool.id(), nodeId, fileName);
|
||||
Assert.assertEquals(7, properties.contentLength());
|
||||
} else {
|
||||
throw new TimeoutException("Task did not complete within the specified timeout");
|
||||
}
|
||||
}
|
||||
finally {
|
||||
try {
|
||||
batchClient.jobOperations().deleteJob(jobId);
|
||||
}
|
||||
catch (Exception e) {
|
||||
// Ignore here
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -207,7 +207,7 @@ public class TaskTests extends BatchTestBase {
|
|||
|
||||
@Test
|
||||
public void testAddMultiTasks() throws Exception {
|
||||
String jobId = getStringWithUserNamePrefix("-Job-" + (new Date()).toString().replace(' ', '-').replace(':', '-').replace('.', '-'));
|
||||
String jobId = getStringWithUserNamePrefix("-Job1-" + (new Date()).toString().replace(' ', '-').replace(':', '-').replace('.', '-'));
|
||||
|
||||
PoolInformation poolInfo = new PoolInformation();
|
||||
poolInfo.withPoolId(livePool.id());
|
||||
|
|
Загрузка…
Ссылка в новой задаче