From 5e62b4ae72c6fa8c81aa3ee817ad1bd14e350c4a Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Mon, 3 Apr 2017 15:56:21 +0900 Subject: [PATCH] NIFI-3668: Fix purging expired replicate requests. This closes #1646. Newly created async response is added before checking map size nor purging expired ones. If there are already 100 remaining requests, the added request will not be executed nor removed. --- .../ThreadPoolRequestReplicator.java | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java index bc2f8bb09..5a19ca35c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java @@ -316,11 +316,47 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { throw new IllegalArgumentException("Cannot replicate request to 0 nodes"); } + // verify all of the nodes exist and are in the proper state + for (final NodeIdentifier nodeId : nodeIds) { + final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); + if (status == null) { + throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster"); + } + + if (status.getState() != NodeConnectionState.CONNECTED) { + throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected"); + } + } + + logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); + // Update headers to indicate the current revision so that we can // prevent multiple users changing the flow at the same time final Map updatedHeaders = new HashMap<>(headers); final String requestId = updatedHeaders.computeIfAbsent(REQUEST_TRANSACTION_ID_HEADER, key -> UUID.randomUUID().toString()); + long verifyClusterStateNanos = -1; + if (performVerification) { + final long start = System.nanoTime(); + verifyClusterState(method, uri.getPath()); + verifyClusterStateNanos = System.nanoTime() - start; + } + + int numRequests = responseMap.size(); + if (numRequests >= MAX_CONCURRENT_REQUESTS) { + numRequests = purgeExpiredRequests(); + } + + if (numRequests >= MAX_CONCURRENT_REQUESTS) { + final Map countsByUri = responseMap.values().stream().collect( + Collectors.groupingBy( + StandardAsyncClusterResponse::getURIPath, + Collectors.counting())); + + logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri); + throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); + } + // create a response object if one was not already passed to us if (response == null) { // create the request objects and replicate to all nodes. @@ -342,44 +378,12 @@ public class ThreadPoolRequestReplicator implements RequestReplicator { final Runnable responseConsumedCallback = () -> onResponseConsumed(requestId); response = new StandardAsyncClusterResponse(requestId, uri, method, nodeIds, - responseMapper, completionCallback, responseConsumedCallback, merge); + responseMapper, completionCallback, responseConsumedCallback, merge); responseMap.put(requestId, response); } - // verify all of the nodes exist and are in the proper state - for (final NodeIdentifier nodeId : nodeIds) { - final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); - if (status == null) { - throw new UnknownNodeException("Node " + nodeId + " does not exist in this cluster"); - } - - if (status.getState() != NodeConnectionState.CONNECTED) { - throw new IllegalClusterStateException("Cannot replicate request to Node " + nodeId + " because the node is not connected"); - } - } - - logger.debug("Replicating request {} {} with entity {} to {}; response is {}", method, uri, entity, nodeIds, response); - - if (performVerification) { - final long start = System.nanoTime(); - verifyClusterState(method, uri.getPath()); - final long nanos = System.nanoTime() - start; - response.addTiming("Verify Cluster State", "All Nodes", nanos); - } - - int numRequests = responseMap.size(); - if (numRequests >= MAX_CONCURRENT_REQUESTS) { - numRequests = purgeExpiredRequests(); - } - - if (numRequests >= MAX_CONCURRENT_REQUESTS) { - final Map countsByUri = responseMap.values().stream().collect( - Collectors.groupingBy( - StandardAsyncClusterResponse::getURIPath, - Collectors.counting())); - - logger.error("Cannot replicate request {} {} because there are {} outstanding HTTP Requests already. Request Counts Per URI = {}", method, uri.getPath(), numRequests, countsByUri); - throw new IllegalStateException("There are too many outstanding HTTP requests with a total " + numRequests + " outstanding requests"); + if (verifyClusterStateNanos > -1) { + response.addTiming("Verify Cluster State", "All Nodes", verifyClusterStateNanos); } logger.debug("For Request ID {}, response object is {}", requestId, response);