KAFKA-8945/KAFKA-8947: Fix bugs in Connect REST extension API (#7392)

Fix bug in Connect REST extension API caused by invalid constructor parameter validation, and update integration test to play nicely with Jenkins

Fix instantiation of TaskState objects by Connect framework.

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Magesh Nandakumar <mageshn@confluent.io>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Chris Egerton 2019-10-15 15:27:26 -07:00 коммит произвёл Randall Hauch
Родитель 5a8ad0a802
Коммит 3b78db3675
6 изменённых файлов: 173 добавлений и 20 удалений

Просмотреть файл

@ -17,6 +17,8 @@
package org.apache.kafka.connect.health;
import java.util.Objects;
/**
* Provides the current status along with identifier for Connect worker and tasks.
*/
@ -34,10 +36,10 @@ public abstract class AbstractState {
* @param traceMessage any error trace message associated with the connector or the task; may be null or empty
*/
public AbstractState(String state, String workerId, String traceMessage) {
if (state != null && !state.trim().isEmpty()) {
if (state == null || state.trim().isEmpty()) {
throw new IllegalArgumentException("State must not be null or empty");
}
if (workerId != null && !workerId.trim().isEmpty()) {
if (workerId == null || workerId.trim().isEmpty()) {
throw new IllegalArgumentException("Worker ID must not be null or empty");
}
this.state = state;
@ -71,4 +73,21 @@ public abstract class AbstractState {
public String traceMessage() {
return traceMessage;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
AbstractState that = (AbstractState) o;
return state.equals(that.state)
&& Objects.equals(traceMessage, that.traceMessage)
&& workerId.equals(that.workerId);
}
@Override
public int hashCode() {
return Objects.hash(state, traceMessage, workerId);
}
}

Просмотреть файл

@ -35,7 +35,7 @@ public class ConnectorHealth {
ConnectorState connectorState,
Map<Integer, TaskState> tasks,
ConnectorType type) {
if (name != null && !name.trim().isEmpty()) {
if (name == null || name.trim().isEmpty()) {
throw new IllegalArgumentException("Connector name is required");
}
Objects.requireNonNull(connectorState, "connectorState can't be null");
@ -83,4 +83,31 @@ public class ConnectorHealth {
return type;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ConnectorHealth that = (ConnectorHealth) o;
return name.equals(that.name)
&& connectorState.equals(that.connectorState)
&& tasks.equals(that.tasks)
&& type == that.type;
}
@Override
public int hashCode() {
return Objects.hash(name, connectorState, tasks, type);
}
@Override
public String toString() {
return "ConnectorHealth{"
+ "name='" + name + '\''
+ ", connectorState=" + connectorState
+ ", tasks=" + tasks
+ ", type=" + type
+ '}';
}
}

Просмотреть файл

@ -32,4 +32,13 @@ public class ConnectorState extends AbstractState {
public ConnectorState(String state, String workerId, String traceMessage) {
super(state, workerId, traceMessage);
}
@Override
public String toString() {
return "ConnectorState{"
+ "state='" + state() + '\''
+ ", traceMessage='" + traceMessage() + '\''
+ ", workerId='" + workerId() + '\''
+ '}';
}
}

Просмотреть файл

@ -50,20 +50,28 @@ public class TaskState extends AbstractState {
@Override
public boolean equals(Object o) {
if (this == o) {
if (this == o)
return true;
}
if (o == null || getClass() != o.getClass()) {
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
}
TaskState taskState = (TaskState) o;
return taskId == taskState.taskId;
}
@Override
public int hashCode() {
return Objects.hash(taskId);
return Objects.hash(super.hashCode(), taskId);
}
@Override
public String toString() {
return "TaskState{"
+ "taskId='" + taskId + '\''
+ "state='" + state() + '\''
+ ", traceMessage='" + traceMessage() + '\''
+ ", workerId='" + workerId() + '\''
+ '}';
}
}

Просмотреть файл

@ -107,7 +107,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
for (ConnectorStateInfo.TaskState state : states) {
taskStates.put(
state.id(),
new TaskState(state.id(), state.workerId(), state.state(), state.trace())
new TaskState(state.id(), state.state(), state.workerId(), state.trace())
);
}
return taskStates;

Просмотреть файл

@ -16,10 +16,16 @@
*/
package org.apache.kafka.connect.integration;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Test;
@ -28,12 +34,18 @@ import org.junit.experimental.categories.Category;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.REST_EXTENSION_CLASSES_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
/**
* A simple integration test to ensure that REST extensions are registered correctly.
@ -41,39 +53,86 @@ import static org.apache.kafka.test.TestUtils.waitForCondition;
@Category(IntegrationTest.class)
public class RestExtensionIntegrationTest {
private static final int NUM_WORKERS = 3;
private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private EmbeddedConnectCluster connect;
@Test
public void testImmediateRequestForListOfConnectors() throws IOException, InterruptedException {
public void testRestExtensionApi() throws IOException, InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(workerProps)
.build();
.name("connect-cluster")
.numWorkers(1)
.numBrokers(1)
.workerProps(workerProps)
.build();
// start the clusters
connect.start();
WorkerHandle worker = connect.workers().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("At least one worker handle should be available"));
waitForCondition(
this::extensionIsRegistered,
REST_EXTENSION_REGISTRATION_TIMEOUT_MS,
"REST extension was never registered"
);
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle("test-conn");
try {
// setup up props for the connector
Map<String, String> connectorProps = new HashMap<>();
connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSinkConnector.class.getSimpleName());
connectorProps.put(TASKS_MAX_CONFIG, String.valueOf(1));
connectorProps.put(TOPICS_CONFIG, "test-topic");
// start a connector
connectorHandle.taskHandle(connectorHandle.name() + "-0");
StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1);
connect.configureConnector(connectorHandle.name(), connectorProps);
connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort());
ConnectorHealth expectedHealth = new ConnectorHealth(
connectorHandle.name(),
new ConnectorState(
"RUNNING",
workerId,
null
),
Collections.singletonMap(
0,
new TaskState(0, "RUNNING", workerId, null)
),
ConnectorType.SINK
);
connectorProps.put(NAME_CONFIG, connectorHandle.name());
// Test the REST extension API; specifically, that the connector's health and configuration
// are available to the REST extension we registered and that they contain expected values
waitForCondition(
() -> verifyConnectorHealthAndConfig(connectorHandle.name(), expectedHealth, connectorProps),
CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS,
"Connector health and/or config was never accessible by the REST extension"
);
} finally {
RuntimeHandles.get().deleteConnector(connectorHandle.name());
}
}
@After
public void close() {
// stop all Connect, Kafka and Zk threads.
connect.stop();
IntegrationTestRestExtension.instance = null;
}
private boolean extensionIsRegistered() {
@ -85,11 +144,42 @@ public class RestExtensionIntegrationTest {
}
}
private boolean verifyConnectorHealthAndConfig(
String connectorName,
ConnectorHealth expectedHealth,
Map<String, String> expectedConfig
) {
ConnectClusterState clusterState =
IntegrationTestRestExtension.instance.restPluginContext.clusterState();
ConnectorHealth actualHealth = clusterState.connectorHealth(connectorName);
if (actualHealth.tasksState().isEmpty()) {
// Happens if the task has been started but its status has not yet been picked up from
// the status topic by the worker.
return false;
}
Map<String, String> actualConfig = clusterState.connectorConfig(connectorName);
assertEquals(expectedConfig, actualConfig);
assertEquals(expectedHealth, actualHealth);
return true;
}
public static class IntegrationTestRestExtension implements ConnectRestExtension {
private static IntegrationTestRestExtension instance;
public ConnectRestExtensionContext restPluginContext;
@Override
public void register(ConnectRestExtensionContext restPluginContext) {
instance = this;
this.restPluginContext = restPluginContext;
// Immediately request a list of connectors to confirm that the context and its fields
// has been fully initialized and there is no risk of deadlock
restPluginContext.clusterState().connectors();
// Install a new REST resource that can be used to confirm that the extension has been
// successfully registered
restPluginContext.configurable().register(new IntegrationTestRestExtensionResource());
}