Fix Livy Spark batch awaitDone issues and add caller into header with UT

This commit is contained in:
Wei Zhang 2019-09-03 10:49:37 +08:00 коммит произвёл Zhang Wei
Родитель b69f1794ad
Коммит 9912df79a3
4 изменённых файлов: 79 добавлений и 13 удалений

Просмотреть файл

@ -3,10 +3,12 @@
package com.microsoft.azure.spark.tools.job;
import com.google.common.collect.Streams;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.NameValuePair;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.checkerframework.checker.nullness.qual.Nullable;
import rx.Emitter;
import rx.Observable;
@ -29,6 +31,7 @@ import com.microsoft.azure.spark.tools.utils.Pair;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@ -325,15 +328,14 @@ public class LivySparkBatch implements SparkBatchJob, Logger {
@Override
public Observable<Pair<String, String>> awaitDone() {
return get()
.repeatWhen(ob -> {
log().debug("Deploy " + 1 //getDelaySeconds()
+ " seconds for next job status probe");
return ob.delay(
1, //getDelaySeconds(),
TimeUnit.SECONDS);
})
.takeUntil(batch -> !isDone(batch.state))
.filter(batch -> !isDone(batch.state))
.repeatWhen(ob -> ob
.doOnNext(v -> log().debug("Deploy " + 1 //getDelaySeconds()
+ " seconds for next job status probe"))
.delay(
1, //getDelaySeconds(),
TimeUnit.SECONDS))
.takeUntil(batch -> isDone(batch.state))
.filter(batch -> isDone(batch.state))
.map(batch -> new Pair<>(batch.state, String.join("\n", batch.submissionLogs)));
}
@ -356,7 +358,11 @@ public class LivySparkBatch implements SparkBatchJob, Logger {
}
public Observable<LivySparkBatch> get() {
return getSparkBatchRequest()
final String caller = Streams.findLast(Arrays.stream(Thread.currentThread().getStackTrace()).limit(3))
.map(StackTraceElement::getMethodName)
.orElse("Unknown");
return getSparkBatchRequest(caller)
.map(this::updateWithBatchResponse)
.defaultIfEmpty(this);
}
@ -384,10 +390,13 @@ public class LivySparkBatch implements SparkBatchJob, Logger {
.delete(uri.toString(), emptyList(), getHeadersToAddOrReplace()));
}
private Observable<Batch> getSparkBatchRequest() {
private Observable<Batch> getSparkBatchRequest(final String caller) {
final List<Header> headers = new ArrayList<>(getHeadersToAddOrReplace());
headers.add(new BasicHeader("X-Invoked-By-Method", caller));
return Observable.fromCallable(this::getUri)
.flatMap(uri -> getHttp()
.get(uri.toString(), emptyList(), getHeadersToAddOrReplace(), Batch.class)
.get(uri.toString(), emptyList(), headers, Batch.class)
.map(Pair::getFirst));
}

Просмотреть файл

@ -7,6 +7,10 @@ import com.microsoft.azure.spark.tools.http.AmbariHttpObservable;
import com.microsoft.azure.spark.tools.http.HttpObservable;
import com.microsoft.azure.spark.tools.utils.LaterInit;
import com.microsoft.azure.spark.tools.utils.MockHttpService;
import com.microsoft.azure.spark.tools.utils.Pair;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.stubbing.Scenario;
import cucumber.api.java.After;
import cucumber.api.java.Before;
import cucumber.api.java.en.And;
@ -17,6 +21,8 @@ import uk.org.lidalia.slf4jtest.TestLogger;
import uk.org.lidalia.slf4jtest.TestLoggerFactory;
import java.net.URI;
import java.util.List;
import java.util.Map;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
@ -121,4 +127,30 @@ public class LivySparkBatchScenario {
public void mockSparkJobBatchIdTo(int expectBatchId) {
doReturn(expectBatchId).when(jobMock).getBatchId();
}
@Given("setup a mock Livy service with the following scenario {string}")
public void setupAMockLivyServiceWithTheFollowingScenarioAwaitJobIsDoneUT(String scenario,
List<Map<String, String>> stubs) {
for (Map<String, String> stub : stubs) {
httpServerMock.stub(
scenario,
stub.get("PREV_STATE"),
stub.get("NEXT_STATE"),
stub.get("ACTION"),
stub.get("URI"),
Integer.parseInt(stub.get("RESPONSE_STATUS")),
stub.get("RESPONSE_BODY"));
}
}
@Then("await Livy Spark job done should get state {string}")
public void awaitLivySparkJobDoneShouldGetStateSuccess(String expect) {
Pair<String, String> statesWithLogs = jobMock.awaitDone()
.toBlocking()
.single();
assertEquals(expect, statesWithLogs.getFirst());
}
}

Просмотреть файл

@ -77,6 +77,17 @@ public class MockHttpService {
.withStatus(statusCode).withBody(normalizeResponse(response))));
}
public void stub(String scenario, String prevState, String nextState, String action, String uri, int statusCode, String response) {
WireMock.configureFor(getPort());
WireMock.stubFor(
WireMock.request(action, WireMock.urlEqualTo(uri))
.inScenario(scenario)
.whenScenarioStateIs(prevState)
.willReturn(WireMock.aResponse().withStatus(statusCode).withBody(normalizeResponse(response)))
.willSetStateTo(nextState));
}
public void stubHttps(String action, String path, int statusCode, String response) {
WireMock.configureFor("https", "localhost", getHttpsPort());
WireMock.stubFor(WireMock.request(
@ -187,7 +198,7 @@ public class MockHttpService {
// .notifier(new ConsoleNotifier(true))
// uncomment for debugging with local proxy
// .proxyVia("localhost", 8888)
.proxyVia("localhost", 8888)
);
// Clean up all history recordings

Просмотреть файл

@ -19,3 +19,17 @@ Feature: LivySparkBatch unit tests
And mock Spark job batch id to 9
Then getting spark job application id, '/batch/9' should be got with 3 times retried
Scenario: await Spark job is done behavior
Given setup a mock Livy service with the following scenario 'awaitJobIsDoneUT'
| ACTION | URI | RESPONSE_STATUS | RESPONSE_BODY | PREV_STATE | NEXT_STATE |
| GET | /batch/9 | 200 | {"id": 9, "state": "not_started"} | Started | starting_1 |
| GET | /batch/9 | 200 | {"id": 9, "state": "starting"} | starting_1 | starting_2 |
| GET | /batch/9 | 200 | {"id": 9, "state": "starting"} | starting_2 | starting_3 |
| GET | /batch/9 | 200 | {"id": 9, "state": "running"} | starting_3 | running_1 |
| GET | /batch/9 | 200 | {"id": 9, "state": "running"} | running_1 | running_2 |
| GET | /batch/9 | 200 | {"id": 9, "state": "success"} | running_2 | success_1 |
| GET | /batch/9 | 200 | {"id": 9, "state": "dead"} | success_1 | end |
And mock Spark job connect URI to be 'http://localhost:$port/batch/'
And mock Spark job batch id to 9
Then await Livy Spark job done should get state 'success'