Add GRPC e2e test framework as well as test cases.

This commit is contained in:
Junyi Yi 2018-04-18 17:29:46 -07:00
Родитель c42b0bc3ca
Коммит 337c9b3e33
9 изменённых файлов: 328 добавлений и 15 удалений

Просмотреть файл

@ -182,6 +182,28 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.21.0</version>
<configuration>
<workingDirectory>${project.build.directory}</workingDirectory>
<systemProperties>
<property>
<name>testing-project-jar</name>
<value>${project.artifactId}-${project.version}-tests.jar</value>
</property>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>test-compile</phase>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Просмотреть файл

@ -10,16 +10,19 @@ import org.apache.commons.cli.*;
* The entry point of the Java Language Worker. Every component could get the command line options from this singleton
* Application instance, and typically that instance will be passed to your components as constructor arguments.
*/
public final class Application {
public final class Application implements IApplication {
private Application(String[] args) {
this.parseCommandLine(args);
}
@Override
public String getHost() { return this.host; }
int getPort() { return this.port; }
@Override
public int getPort() { return this.port; }
@Override
public boolean logToConsole() { return this.logToConsole; }
private String getWorkerId() { return this.workerId; }
private String getRequestId() { return this.requestId; }
boolean logToConsole() { return this.logToConsole; }
private void printUsage() {
HelpFormatter formatter = new HelpFormatter();
@ -96,7 +99,7 @@ public final class Application {
.build());
public static void main(String[] args) throws IOException {
public static void main(String[] args) {
System.out.println("Microsoft Azure Functions Java Runtime [build " + version() + "]");
Application app = new Application(args);
if (!app.isCommandlineValid()) {
@ -104,7 +107,7 @@ public final class Application {
System.exit(1);
} else {
try (JavaWorkerClient client = new JavaWorkerClient(app)) {
client.listen(app.getWorkerId(), app.getRequestId());
client.listen(app.getWorkerId(), app.getRequestId()).get();
} catch (Exception ex) {
WorkerLogManager.getSystemLogger().log(Level.SEVERE, "Unexpected Exception causes system to exit", ex);
System.exit(-1);

Просмотреть файл

@ -0,0 +1,7 @@
package com.microsoft.azure.webjobs.script;
public interface IApplication {
boolean logToConsole();
String getHost();
int getPort();
}

Просмотреть файл

@ -19,8 +19,8 @@ import com.microsoft.azure.webjobs.script.rpc.messages.*;
* Grpc client talks with the Azure Functions Runtime Host. It will dispatch to different message handlers according to the inbound message type.
* Thread-Safety: Single thread.
*/
class JavaWorkerClient implements AutoCloseable {
JavaWorkerClient(Application app) {
public class JavaWorkerClient implements AutoCloseable {
public JavaWorkerClient(IApplication app) {
WorkerLogManager.initialize(this, app.logToConsole());
this.channel = ManagedChannelBuilder.forAddress(app.getHost(), app.getPort()).usePlaintext(true).build();
this.peer = new AtomicReference<>(null);
@ -39,14 +39,10 @@ class JavaWorkerClient implements AutoCloseable {
this.handlerSuppliers.put(StreamingMessage.ContentCase.INVOCATION_REQUEST, () -> new InvocationRequestHandler(broker));
}
void listen(String workerId, String requestId) throws Exception {
try (StreamingMessagePeer peer = new StreamingMessagePeer()) {
this.peer.set(peer);
peer.send(requestId, new StartStreamHandler(workerId));
peer.getListeningTask().get();
} finally {
this.peer.set(null);
}
public Future<Void> listen(String workerId, String requestId) {
this.peer.set(new StreamingMessagePeer());
this.peer.get().send(requestId, new StartStreamHandler(workerId));
return this.peer.get().getListeningTask();
}
void logToHost(LogRecord record, String invocationId) {
@ -58,8 +54,11 @@ class JavaWorkerClient implements AutoCloseable {
@Override
public void close() throws Exception {
this.peer.get().close();
this.peer.set(null);
this.channel.shutdownNow();
this.channel.awaitTermination(15, TimeUnit.SECONDS);
WorkerLogManager.deinitialize();
}
private class StreamingMessagePeer implements StreamObserver<StreamingMessage>, AutoCloseable {

Просмотреть файл

@ -13,6 +13,7 @@ public class WorkerLogManager {
public static Logger getInvocationLogger(String invocationId) { return INSTANCE.getInvocationLoggerImpl(invocationId); }
static void initialize(JavaWorkerClient client, boolean logToConsole) { INSTANCE.initializeImpl(client, logToConsole); }
static void deinitialize() { INSTANCE.deinitializempl(); }
private WorkerLogManager() {
this.client = null;
@ -30,6 +31,13 @@ public class WorkerLogManager {
addHostClientHandlers(this.hostLogger, null);
}
private void deinitializempl() {
assert this.client != null;
clearHandlers(this.hostLogger);
this.logToConsole = false;
this.client = null;
}
private Logger getInvocationLoggerImpl(String invocationId) {
Logger logger = Logger.getAnonymousLogger();
logger.setLevel(Level.ALL);

Просмотреть файл

@ -0,0 +1,58 @@
package com.microsoft.azure.webjobs.script.functional.tests;
import java.util.*;
import com.microsoft.azure.webjobs.script.rpc.messages.*;
import com.microsoft.azure.webjobs.script.test.categories.*;
import com.microsoft.azure.webjobs.script.test.utilities.*;
import org.junit.*;
import org.junit.experimental.categories.*;
import static org.junit.Assert.*;
public class SimpleParamReturnTests extends FunctionsTestBase {
public static String EmptyParameterFunction() {
return "Empty Parameter Result";
}
private static int intReturnValue = 0;
public static int ReturnIntFunction() {
return intReturnValue;
}
@Test
@Category({IntegrationTesting.class, SmokeTesting.class, FunctionalTesting.class})
public void testEmptyParameter() throws Exception {
try (FunctionsTestHost host = new FunctionsTestHost()) {
this.loadFunction(host, "emptyparam", "EmptyParameterFunction");
InvocationResponse response = host.call("getret", "emptyparam");
assertEquals(TypedData.DataCase.STRING, response.getReturnValue().getDataCase());
assertEquals("Empty Parameter Result", response.getReturnValue().getString());
}
}
@Test
@Category({IntegrationTesting.class, SmokeTesting.class, FunctionalTesting.class})
public void testIntReturn() throws Exception {
try (FunctionsTestHost host = new FunctionsTestHost()) {
this.loadFunction(host, "intreturn", "ReturnIntFunction");
intReturnValue = 13579;
InvocationResponse response = host.call("getret13579", "intreturn");
assertEquals(TypedData.DataCase.INT, response.getReturnValue().getDataCase());
assertEquals(intReturnValue, response.getReturnValue().getInt());
intReturnValue = 24680;
response = host.call("getret24680", "intreturn");
assertEquals(TypedData.DataCase.INT, response.getReturnValue().getDataCase());
assertEquals(intReturnValue, response.getReturnValue().getInt());
for (int i = 0; i < 100; i++) {
intReturnValue = new Random().nextInt();
response = host.call("getretloop" + intReturnValue, "intreturn");
assertEquals(TypedData.DataCase.INT, response.getReturnValue().getDataCase());
assertEquals(intReturnValue, response.getReturnValue().getInt());
}
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
package com.microsoft.azure.webjobs.script.test.categories;
public interface IntegrationTesting {}

Просмотреть файл

@ -0,0 +1,18 @@
package com.microsoft.azure.webjobs.script.test.utilities;
import java.lang.reflect.*;
import java.util.*;
import com.microsoft.azure.webjobs.script.rpc.messages.*;
public class FunctionsTestBase {
protected void loadFunction(FunctionsTestHost host, String id, String method) throws Exception {
String fullname = this.getClass().getCanonicalName() + "." + method;
Map<String, BindingInfo> bindings = new HashMap<>();
Method entry = this.getClass().getMethod(method);
if (!entry.getReturnType().equals(Void.TYPE)) {
bindings.put("$return", BindingInfo.newBuilder().setDirection(BindingInfo.Direction.out).build());
}
host.loadFunction(id, fullname, bindings);
}
}

Просмотреть файл

@ -0,0 +1,195 @@
package com.microsoft.azure.webjobs.script.test.utilities;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import java.util.function.*;
import java.util.stream.*;
import javax.annotation.*;
import javax.annotation.concurrent.*;
import com.google.protobuf.*;
import com.microsoft.azure.webjobs.script.*;
import com.microsoft.azure.webjobs.script.rpc.messages.*;
import io.grpc.*;
import io.grpc.stub.*;
import org.apache.commons.lang3.tuple.*;
public final class FunctionsTestHost implements AutoCloseable, IApplication {
public FunctionsTestHost() throws Exception {
this.initializeServer();
this.initializeClient();
}
@PostConstruct
private void initializeServer() throws IOException {
ServerBuilder<?> builder = ServerBuilder.forPort(this.getPort());
this.grpcHost = new HostGrpcImplementation();
this.server = builder.addService(this.grpcHost).build();
this.server.start();
}
@PostConstruct
private void initializeClient() throws InterruptedException {
this.client = new JavaWorkerClient(this);
this.client.listen("java-worker-test", HostGrpcImplementation.ESTABLISH_REQID);
this.grpcHost.handleMessage(HostGrpcImplementation.ESTABLISH_REQID, m -> this.grpcHost.initWorker());
}
@Override
public void close() throws Exception {
this.client.close();
this.server.shutdownNow().awaitTermination();
}
public void loadFunction(String id, String reflectionName, Map<String, BindingInfo> bindings) throws Exception {
this.grpcHost.handleMessage(HostGrpcImplementation.INITWORKER_REQID, m -> this.grpcHost.loadFunction(id, reflectionName, bindings));
this.grpcHost.handleMessage(HostGrpcImplementation.LOADFUNC_REQID, null);
}
@SafeVarargs
public final InvocationResponse call(String reqId, String funcId, Triple<String, TypedData.DataCase, Object> ...params) throws Exception {
AtomicReference<InvocationResponse> response = new AtomicReference<>();
this.grpcHost.handleMessage(this.lastCallReqId, m -> this.grpcHost.invokeFunction(reqId, funcId, params));
this.grpcHost.handleMessage(reqId, m -> {
response.set(m.getInvocationResponse());
return null;
});
this.lastCallReqId = reqId;
return response.get();
}
@Override
public boolean logToConsole() { return false; }
@Override
public String getHost() { return "localhost"; }
@Override
public int getPort() { return 55005; }
private JavaWorkerClient client;
private HostGrpcImplementation grpcHost;
private Server server;
private String lastCallReqId = HostGrpcImplementation.LOADFUNC_REQID;
@ThreadSafe
private class HostGrpcImplementation extends FunctionRpcGrpc.FunctionRpcImplBase {
static final String ESTABLISH_REQID = "establish", INITWORKER_REQID = "init-worker", LOADFUNC_REQID = "load-function";
private final Lock lock = new ReentrantLock();
private final Map<String, Condition> respReady = new HashMap<>();
private final Map<String, StreamingMessage> respValue = new HashMap<>();
private final Map<String, StreamObserver<StreamingMessage>> responder = new HashMap<>();
private Condition getResponseCondition(String requestId) {
return this.respReady.computeIfAbsent(requestId, k -> this.lock.newCondition());
}
private void setResponse(String requestId, StreamingMessage value, StreamObserver<StreamingMessage> client) {
this.respValue.put(requestId, value);
this.responder.put(requestId, client);
this.getResponseCondition(requestId).signal();
}
void handleMessage(String requestId, Function<StreamingMessage, StreamingMessage> handler) throws InterruptedException {
this.lock.lock();
try {
if (this.responder.get(requestId) == null) {
this.getResponseCondition(requestId).await();
}
StreamingMessage message = this.respValue.get(requestId);
StreamingMessage response = null;
if (handler != null) {
response = handler.apply(message);
}
if (response != null) {
this.responder.get(requestId).onNext(response);
}
} finally {
this.lock.unlock();
}
}
private StreamingMessage initWorker() {
WorkerInitRequest.Builder request = WorkerInitRequest.newBuilder().setHostVersion("2.0.0");
return StreamingMessage.newBuilder().setRequestId(INITWORKER_REQID).setWorkerInitRequest(request).build();
}
private StreamingMessage loadFunction(String id, String reflectionName, Map<String, BindingInfo> bindings) {
RpcFunctionMetadata.Builder metadata = RpcFunctionMetadata.newBuilder()
.setName(reflectionName.substring(reflectionName.lastIndexOf('.') + 1))
.setDirectory(".")
.setScriptFile(System.getProperty("testing-project-jar"))
.setEntryPoint(reflectionName)
.putAllBindings(bindings);
FunctionLoadRequest.Builder request = FunctionLoadRequest.newBuilder()
.setFunctionId(id)
.setMetadata(metadata);
return StreamingMessage.newBuilder().setRequestId(LOADFUNC_REQID).setFunctionLoadRequest(request).build();
}
private StreamingMessage invokeFunction(String reqId, String funcId, Triple<String, TypedData.DataCase, Object>[] params) {
List<ParameterBinding> bindings = Arrays.stream(params).map(p -> {
ParameterBinding.Builder binding = ParameterBinding.newBuilder();
TypedData.Builder data = TypedData.newBuilder();
if (p.getLeft() != null && !p.getLeft().isEmpty()) {
binding.setName(p.getLeft());
}
switch (p.getMiddle()) {
case STRING:
data.setString((String) p.getRight());
break;
case JSON:
data.setJson((String) p.getRight());
break;
case BYTES:
data.setBytes((ByteString) p.getRight());
break;
case HTTP:
data.setHttp((RpcHttp.Builder) p.getRight());
break;
case INT:
data.setInt((long)p.getRight());
break;
case DOUBLE:
data.setDouble((double)p.getRight());
break;
default:
throw new UnsupportedOperationException(p.toString());
}
return binding.setData(data).build();
}).collect(Collectors.toList());
InvocationRequest.Builder request = InvocationRequest.newBuilder()
.setInvocationId(reqId)
.setFunctionId(funcId)
.addAllInputData(bindings);
return StreamingMessage.newBuilder().setRequestId(reqId).setInvocationRequest(request).build();
}
@Override
public StreamObserver<StreamingMessage> eventStream(StreamObserver<StreamingMessage> responseObserver) {
return new StreamObserver<StreamingMessage>() {
@Override
public void onNext(StreamingMessage msg) {
HostGrpcImplementation.this.lock.lock();
try {
if (!msg.getContentCase().equals(StreamingMessage.ContentCase.RPC_LOG)) {
HostGrpcImplementation.this.setResponse(msg.getRequestId(), msg, responseObserver);
}
} finally {
HostGrpcImplementation.this.lock.unlock();
}
}
@Override
public void onError(Throwable t) {
throw new RuntimeException(t);
}
@Override
public void onCompleted() {}
};
}
}
}