diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 11ea3209d..7afc95dda 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskAssignmentException; import org.apache.kafka.streams.processor.TaskId; @@ -135,8 +136,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { consumers.add(consumerMemberId); - state.addPreviousActiveTasks(info.prevTasks()); - state.addPreviousStandbyTasks(info.standbyTasks()); + state.addPreviousActiveTasks(consumerMemberId, info.prevTasks()); + state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks()); state.incrementCapacity(); } @@ -286,6 +287,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final Set futureConsumers = new HashSet<>(); int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION; + int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION; int futureMetadataVersion = UNKNOWN; for (final Map.Entry entry : subscriptions.entrySet()) { @@ -303,6 +305,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf minReceivedMetadataVersion = usedVersion; } + final int latestSupportedVersion = info.latestSupportedVersion(); + if (latestSupportedVersion < minSupportedMetadataVersion) { + minSupportedMetadataVersion = latestSupportedVersion; + } + // create the new client metadata if necessary ClientMetadata clientMetadata = clientMetadataMap.get(info.processId()); @@ -572,14 +579,16 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf partitionsForTask, partitionsByHostState, futureConsumers, - minReceivedMetadataVersion + minReceivedMetadataVersion, + minSupportedMetadataVersion ); } else { assignment = computeNewAssignment( clientMetadataMap, partitionsForTask, partitionsByHostState, - minReceivedMetadataVersion + minReceivedMetadataVersion, + minSupportedMetadataVersion ); } @@ -589,7 +598,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private static Map computeNewAssignment(final Map clientsMetadata, final Map> partitionsForTask, final Map> partitionsByHostState, - final int minUserMetadataVersion) { + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { final Map assignment = new HashMap<>(); // within the client, distribute tasks to its owned consumers @@ -605,17 +615,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf int consumerTaskIndex = 0; for (final String consumer : consumers) { + final List activeTasks = interleavedActive.get(consumerTaskIndex); + + // These will be filled in by buildAssignedActiveTaskAndPartitionsList below + final List activePartitionsList = new ArrayList<>(); + final List assignedActiveList = new ArrayList<>(); + + buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask); + final Map> standby = new HashMap<>(); - final List assignedPartitions = new ArrayList<>(); - - final List assignedActiveList = interleavedActive.get(consumerTaskIndex); - - for (final TaskId taskId : assignedActiveList) { - for (final TopicPartition partition : partitionsForTask.get(taskId)) { - assignedPartitions.add(new AssignedPartition(taskId, partition)); - } - } - if (!state.standbyTasks().isEmpty()) { final List assignedStandbyList = interleavedStandby.get(consumerTaskIndex); for (final TaskId taskId : assignedStandbyList) { @@ -625,22 +633,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf consumerTaskIndex++; - Collections.sort(assignedPartitions); - final List active = new ArrayList<>(); - final List activePartitions = new ArrayList<>(); - for (final AssignedPartition partition : assignedPartitions) { - active.add(partition.taskId); - activePartitions.add(partition.partition); - } - // finally, encode the assignment before sending back to coordinator assignment.put( consumer, new Assignment( - activePartitions, + activePartitionsList, new AssignmentInfo( minUserMetadataVersion, - active, + minSupportedMetadataVersion, + assignedActiveList, standby, partitionsByHostState, 0 @@ -657,7 +658,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final Map> partitionsForTask, final Map> partitionsByHostState, final Set futureConsumers, - final int minUserMetadataVersion) { + final int minUserMetadataVersion, + final int minSupportedMetadataVersion) { final Map assignment = new HashMap<>(); // assign previously assigned tasks to "old consumers" @@ -668,23 +670,27 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf continue; } - final List activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks()); + // Return the same active tasks that were claimed in the subscription + final List activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId)); - final List assignedPartitions = new ArrayList<>(); - for (final TaskId taskId : activeTasks) { - assignedPartitions.addAll(partitionsForTask.get(taskId)); - } + // These will be filled in by buildAssignedActiveTaskAndPartitionsList below + final List activePartitionsList = new ArrayList<>(); + final List assignedActiveList = new ArrayList<>(); + buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask); + + // Return the same standby tasks that were claimed in the subscription final Map> standbyTasks = new HashMap<>(); - for (final TaskId taskId : clientMetadata.state.prevStandbyTasks()) { + for (final TaskId taskId : clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) { standbyTasks.put(taskId, partitionsForTask.get(taskId)); } assignment.put(consumerId, new Assignment( - assignedPartitions, + activePartitionsList, new AssignmentInfo( minUserMetadataVersion, - activeTasks, + minSupportedMetadataVersion, + assignedActiveList, standbyTasks, partitionsByHostState, 0) @@ -697,13 +703,34 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf for (final String consumerId : futureConsumers) { assignment.put(consumerId, new Assignment( Collections.emptyList(), - new AssignmentInfo().encode() + new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion).encode() )); } return assignment; } + private static void buildAssignedActiveTaskAndPartitionsList(final List activeTasks, + final List activePartitionsList, + final List assignedActiveList, + final Map> partitionsForTask) { + final List assignedPartitions = new ArrayList<>(); + + // Build up list of all assigned partition-task pairs + for (final TaskId taskId : activeTasks) { + for (final TopicPartition partition : partitionsForTask.get(taskId)) { + assignedPartitions.add(new AssignedPartition(taskId, partition)); + } + } + + // Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping + Collections.sort(assignedPartitions); + for (final AssignedPartition partition : assignedPartitions) { + assignedActiveList.add(partition.taskId); + activePartitionsList.add(partition.partition); + } + } + // visible for testing static List> interleaveTasksByGroupId(final Collection taskIds, final int numberThreads) { final LinkedList sortedTasks = new LinkedList<>(taskIds); @@ -724,15 +751,66 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf return taskIdsForConsumerAssignment; } - private void upgradeSubscriptionVersionIfNeeded(final int leaderSupportedVersion) { - if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { - log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + - "Upgrading subscription metadata version to {} for next rebalance.", - usedSubscriptionMetadataVersion, - leaderSupportedVersion, - leaderSupportedVersion); - usedSubscriptionMetadataVersion = leaderSupportedVersion; + private void validateMetadataVersions(final int receivedAssignmentMetadataVersion, + final int latestCommonlySupportedVersion) { + + if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) { + log.error("Leader sent back an assignment with version {} which was greater than our used version {}", + receivedAssignmentMetadataVersion, usedSubscriptionMetadataVersion); + throw new TaskAssignmentException( + "Sent a version " + usedSubscriptionMetadataVersion + + " subscription but got an assignment with higher version " + + receivedAssignmentMetadataVersion + "." + ); } + + if (latestCommonlySupportedVersion > LATEST_SUPPORTED_VERSION) { + log.error("Leader sent back assignment with commonly supported version {} that is greater than our " + + "actual latest supported version {}", latestCommonlySupportedVersion, LATEST_SUPPORTED_VERSION); + throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support"); + } + + if (receivedAssignmentMetadataVersion < EARLIEST_PROBEABLE_VERSION) { + log.error("Leader sent back assignment with version {} which is less than the earliest probeable version {}. " + + "To do a rolling upgrade from a version earlier than {} you must set the {} config", + receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION, EARLIEST_PROBEABLE_VERSION, + StreamsConfig.UPGRADE_FROM_CONFIG); + throw new IllegalStateException("Can't use version probing with versions less than " + EARLIEST_PROBEABLE_VERSION); + } + } + + // Returns true if subscription version was changed, indicating version probing and need to rebalance again + protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion, + final int latestCommonlySupportedVersion) { + // If the latest commonly supported version is now greater than our used version, this indicates we have just + // completed the rolling upgrade and can now update our subscription version for the final rebalance + if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) { + log.info( + "Sent a version {} subscription and group's latest commonly supported version is {} (successful " + + "version probing and end of rolling upgrade). Upgrading subscription metadata version to " + + "{} for next rebalance.", + usedSubscriptionMetadataVersion, + latestCommonlySupportedVersion, + latestCommonlySupportedVersion + ); + usedSubscriptionMetadataVersion = latestCommonlySupportedVersion; + return true; + } + + // If we received a lower version than we sent, someone else in the group still hasn't upgraded. We + // should downgrade our subscription until everyone is on the latest version + if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion) { + log.info( + "Sent a version {} subscription and got version {} assignment back (successful version probing). " + + "Downgrade subscription metadata to commonly supported version and trigger new rebalance.", + usedSubscriptionMetadataVersion, + receivedAssignmentMetadataVersion + ); + usedSubscriptionMetadataVersion = latestCommonlySupportedVersion; + return true; + } + + return false; } /** @@ -746,43 +824,27 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); if (info.errCode() != AssignorError.NONE.code()) { // set flag to shutdown streams app - assignmentErrorCode.set(info.errCode()); + setAssignmentErrorCode(info.errCode()); return; } + /* + * latestCommonlySupportedVersion belongs to [usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION] + * receivedAssignmentMetadataVersion belongs to [EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion] + * + * usedSubscriptionMetadataVersion will be downgraded to receivedAssignmentMetadataVersion during a rolling + * bounce upgrade with version probing. + * + * usedSubscriptionMetadataVersion will be upgraded to latestCommonlySupportedVersion when all members have + * been bounced and it is safe to use the latest version. + */ final int receivedAssignmentMetadataVersion = info.version(); - final int leaderSupportedVersion = info.latestSupportedVersion(); + final int latestCommonlySupportedVersion = info.commonlySupportedVersion(); - if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) { - throw new IllegalStateException( - "Sent a version " + usedSubscriptionMetadataVersion - + " subscription but got an assignment with higher version " - + receivedAssignmentMetadataVersion + "." - ); - } + validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion); - if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion - && receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) { - - if (receivedAssignmentMetadataVersion == leaderSupportedVersion) { - log.info( - "Sent a version {} subscription and got version {} assignment back (successful version probing). " + - "Downgrading subscription metadata to received version and trigger new rebalance.", - usedSubscriptionMetadataVersion, - receivedAssignmentMetadataVersion - ); - usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion; - } else { - log.info( - "Sent a version {} subscription and got version {} assignment back (successful version probing). " + - "Setting subscription metadata to leaders supported version {} and trigger new rebalance.", - usedSubscriptionMetadataVersion, - receivedAssignmentMetadataVersion, - leaderSupportedVersion - ); - usedSubscriptionMetadataVersion = leaderSupportedVersion; - } - - assignmentErrorCode.set(AssignorError.VERSION_PROBING.code()); + // Check if this was a version probing rebalance and check the error code to trigger another rebalance if so + if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) { + setAssignmentErrorCode(AssignorError.VERSION_PROBING.code()); return; } @@ -800,13 +862,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf partitionsByHost = Collections.emptyMap(); break; case VERSION_TWO: - processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId); - partitionsByHost = info.partitionsByHost(); - break; case VERSION_THREE: case VERSION_FOUR: case VERSION_FIVE: - upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion); processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId); partitionsByHost = info.partitionsByHost(); break; @@ -915,6 +973,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf } } + protected void setAssignmentErrorCode(final Integer errorCode) { + assignmentErrorCode.set(errorCode); + } + // following functions are for test only void setInternalTopicManager(final InternalTopicManager internalTopicManager) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index 17563fd46..3adac4403 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -72,9 +72,11 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener { ); } else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code()) { log.debug( - "Encountered assignment error during partition assignment: {}. Skipping task initialization", + "Encountered assignment error during partition assignment: {}. Skipping task initialization and " + + "pausing any partitions we may have been assigned.", streamThread.getAssignmentErrorCode() ); + taskManager.pausePartitions(); } else { // Close non-reassigned tasks before initializing new ones as we may have suspended active // tasks that become standbys or vice versa diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 0a7acf333..cd90fad80 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -129,8 +129,7 @@ public class TaskManager { } // Pause all the new partitions until the underlying state store is ready for all the active tasks. - log.trace("Pausing partitions: {}", assignment); - consumer.pause(assignment); + pausePartitions(); } private void resumeSuspended(final Collection assignment) { @@ -366,6 +365,11 @@ public class TaskManager { return taskCreator.builder(); } + void pausePartitions() { + log.trace("Pausing partitions: {}", consumer.assignment()); + consumer.pause(consumer.assignment()); + } + /** * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 7d812309d..dcaaa9a7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -44,18 +44,21 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); private final int usedVersion; - private final int latestSupportedVersion; + private final int commonlySupportedVersion; private int errCode; private List activeTasks; private Map> standbyTasks; private Map> partitionsByHost; - // used for decoding; don't apply version checks - private AssignmentInfo(final int version, - final int latestSupportedVersion) { - this.usedVersion = version; - this.latestSupportedVersion = latestSupportedVersion; - this.errCode = 0; + // used for decoding and "future consumer" assignments during version probing + public AssignmentInfo(final int version, + final int commonlySupportedVersion) { + this(version, + commonlySupportedVersion, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + 0); } // for testing only @@ -65,40 +68,31 @@ public class AssignmentInfo { this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, 0); } - // creates an empty assignment - public AssignmentInfo() { - this(LATEST_SUPPORTED_VERSION, - Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap(), - 0); - } - public AssignmentInfo(final int version, final List activeTasks, final Map> standbyTasks, final Map> partitionsByHost, final int errCode) { this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, errCode); - if (version < 1 || version > LATEST_SUPPORTED_VERSION) { - throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION - + "; was: " + version); - } } - // for testing only; don't apply version checks - AssignmentInfo(final int version, - final int latestSupportedVersion, - final List activeTasks, - final Map> standbyTasks, - final Map> partitionsByHost, - final int errCode) { + public AssignmentInfo(final int version, + final int commonlySupportedVersion, + final List activeTasks, + final Map> standbyTasks, + final Map> partitionsByHost, + final int errCode) { this.usedVersion = version; - this.latestSupportedVersion = latestSupportedVersion; + this.commonlySupportedVersion = commonlySupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = partitionsByHost; this.errCode = errCode; + + if (version < 1 || version > LATEST_SUPPORTED_VERSION) { + throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + + "; was: " + version); + } } public int version() { @@ -109,8 +103,8 @@ public class AssignmentInfo { return errCode; } - public int latestSupportedVersion() { - return latestSupportedVersion; + public int commonlySupportedVersion() { + return commonlySupportedVersion; } public List activeTasks() { @@ -422,7 +416,7 @@ public class AssignmentInfo { @Override public int hashCode() { - return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() + return usedVersion ^ commonlySupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode() ^ errCode; } @@ -431,7 +425,7 @@ public class AssignmentInfo { if (o instanceof AssignmentInfo) { final AssignmentInfo other = (AssignmentInfo) o; return usedVersion == other.usedVersion && - latestSupportedVersion == other.latestSupportedVersion && + commonlySupportedVersion == other.commonlySupportedVersion && errCode == other.errCode && activeTasks.equals(other.activeTasks) && standbyTasks.equals(other.standbyTasks) && @@ -444,7 +438,7 @@ public class AssignmentInfo { @Override public String toString() { return "[version=" + usedVersion - + ", supported version=" + latestSupportedVersion + + ", supported version=" + commonlySupportedVersion + ", active tasks=" + activeTasks + ", standby tasks=" + standbyTasks + ", partitions by host=" + partitionsByHost + "]"; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 6eff7bdc7..ab213d520 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.streams.processor.TaskId; import java.util.HashSet; @@ -29,15 +31,25 @@ public class ClientState { private final Set prevStandbyTasks; private final Set prevAssignedTasks; - private int capacity; + private final Map> prevActiveTasksByConsumer; + private final Map> prevStandbyTasksByConsumer; + private int capacity; public ClientState() { this(0); } ClientState(final int capacity) { - this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), capacity); + this(new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashSet<>(), + new HashMap<>(), + new HashMap<>(), + capacity); } private ClientState(final Set activeTasks, @@ -46,6 +58,8 @@ public class ClientState { final Set prevActiveTasks, final Set prevStandbyTasks, final Set prevAssignedTasks, + final Map> prevActiveTasksByConsumer, + final Map> prevStandbyTasksByConsumer, final int capacity) { this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; @@ -53,6 +67,8 @@ public class ClientState { this.prevActiveTasks = prevActiveTasks; this.prevStandbyTasks = prevStandbyTasks; this.prevAssignedTasks = prevAssignedTasks; + this.prevActiveTasksByConsumer = prevActiveTasksByConsumer; + this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer; this.capacity = capacity; } @@ -64,6 +80,8 @@ public class ClientState { new HashSet<>(prevActiveTasks), new HashSet<>(prevStandbyTasks), new HashSet<>(prevAssignedTasks), + new HashMap<>(prevActiveTasksByConsumer), + new HashMap<>(prevStandbyTasksByConsumer), capacity); } @@ -107,14 +125,24 @@ public class ClientState { return activeTasks.size(); } - public void addPreviousActiveTasks(final Set prevTasks) { + public void addPreviousActiveTasks(final String consumer, final Set prevTasks) { prevActiveTasks.addAll(prevTasks); prevAssignedTasks.addAll(prevTasks); + prevActiveTasksByConsumer.put(consumer, prevTasks); } - public void addPreviousStandbyTasks(final Set standbyTasks) { + public void addPreviousStandbyTasks(final String consumer, final Set standbyTasks) { prevStandbyTasks.addAll(standbyTasks); prevAssignedTasks.addAll(standbyTasks); + prevStandbyTasksByConsumer.put(consumer, standbyTasks); + } + + public Set prevActiveTasksForConsumer(final String consumer) { + return prevActiveTasksByConsumer.get(consumer); + } + + public Set prevStandbyTasksForConsumer(final String consumer) { + return prevStandbyTasksByConsumer.get(consumer); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 5f67a9875..e997f2e12 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -1236,7 +1236,7 @@ public class StreamsPartitionAssignorTest { ))); assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1))); - assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo())); + assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION))); assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 020ee363c..7e1ca7f96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -319,6 +319,7 @@ public class TaskManagerTest { @Test public void shouldPauseActivePartitions() { mockSingleActiveTask(); + expect(consumer.assignment()).andReturn(taskId0Partitions).times(2); consumer.pause(taskId0Partitions); expectLastCall(); replay(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 034ae7b91..dc54c865d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -70,7 +70,7 @@ public class ClientStateTest { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); - client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2)); + client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2)); assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1, tid2))); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); } @@ -80,7 +80,7 @@ public class ClientStateTest { final TaskId tid1 = new TaskId(0, 1); final TaskId tid2 = new TaskId(0, 2); - client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2)); + client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2)); assertThat(client.previousActiveTasks().size(), equalTo(0)); assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2))); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 17d403f13..19d773078 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -207,11 +207,11 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignTasksToClientWithPreviousStandbyTasks() { final ClientState client1 = createClient(p1, 1); - client1.addPreviousStandbyTasks(Utils.mkSet(task02)); + client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02)); final ClientState client2 = createClient(p2, 1); - client2.addPreviousStandbyTasks(Utils.mkSet(task01)); + client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); final ClientState client3 = createClient(p3, 1); - client3.addPreviousStandbyTasks(Utils.mkSet(task00)); + client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -225,9 +225,9 @@ public class StickyTaskAssignorTest { @Test public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00); - c1.addPreviousStandbyTasks(Utils.mkSet(task01)); + c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02); - c2.addPreviousStandbyTasks(Utils.mkSet(task01)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02); @@ -455,9 +455,9 @@ public class StickyTaskAssignorTest { @Test public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() { final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02); - c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00)); + c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00); - c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02)); createClient(p3, 1); createClient(p4, 1); @@ -577,14 +577,14 @@ public class StickyTaskAssignorTest { final TaskId task23 = new TaskId(2, 3); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); - c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); + c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11, task20, task21, task23)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); - c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21, task23); - c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12)); + c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12)); final ClientState newClient = createClient(p4, 1); - newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); + newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); taskAssignor.assign(0); @@ -607,15 +607,15 @@ public class StickyTaskAssignorTest { final TaskId task23 = new TaskId(2, 3); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13); - c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23)); + c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11, task20, task21, task23)); final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22); - c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23)); final ClientState bounce1 = createClient(p3, 1); - bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23)); + bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20, task21, task23)); final ClientState bounce2 = createClient(p4, 1); - bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10)); + bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task03, task10)); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23); taskAssignor.assign(0); @@ -658,7 +658,7 @@ public class StickyTaskAssignorTest { final TaskId task06 = new TaskId(0, 6); final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06); final ClientState c2 = createClient(p2, 1); - c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05)); + c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04, task05)); final ClientState newClient = createClient(p3, 1); final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05, task06); @@ -705,7 +705,7 @@ public class StickyTaskAssignorTest { private ClientState createClientWithPreviousActiveTasks(final Integer processId, final int capacity, final TaskId... taskIds) { final ClientState clientState = new ClientState(capacity); - clientState.addPreviousActiveTasks(Utils.mkSet(taskIds)); + clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds)); clients.put(processId, clientState); return clientState; } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index de5311cef..0e07cac97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.apache.kafka.streams.processor.internals.TaskManager; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; import org.apache.kafka.streams.state.HostInfo; @@ -167,6 +168,11 @@ public class StreamsUpgradeTest { final AssignmentInfo info = AssignmentInfo.decode( assignment.userData().putInt(0, LATEST_SUPPORTED_VERSION)); + if (super.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) { + setAssignmentErrorCode(AssignorError.VERSION_PROBING.code()); + return; + } + final List partitions = new ArrayList<>(assignment.partitions()); partitions.sort(PARTITION_COMPARATOR); @@ -305,6 +311,7 @@ public class StreamsUpgradeTest { private FutureAssignmentInfo(final boolean bumpUsedVersion, final boolean bumpSupportedVersion, final ByteBuffer bytes) { + super(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION); this.bumpUsedVersion = bumpUsedVersion; this.bumpSupportedVersion = bumpSupportedVersion; originalUserMetadata = bytes; diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index fb8573281..45e20ce26 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -355,8 +355,6 @@ class StreamsUpgradeTest(Test): for p in self.processors: found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True)) if len(found) >= self.leader_counter[p] + 1: - if self.leader is not None: - raise Exception("Could not uniquely identify leader") self.leader = p self.leader_counter[p] = self.leader_counter[p] + 1 @@ -557,38 +555,25 @@ class StreamsUpgradeTest(Test): else: self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1 - if processor == self.leader: - leader_monitor = log_monitor - elif first_other_processor == self.leader: - leader_monitor = first_other_monitor - elif second_other_processor == self.leader: - leader_monitor = second_other_monitor - else: - raise Exception("Could not identify leader.") - monitors = {} monitors[processor] = log_monitor monitors[first_other_processor] = first_other_monitor monitors[second_other_processor] = second_other_monitor - leader_monitor.wait_until("Received a future (version probing) subscription (version: 6). Sending empty assignment back (with supported version 5).", - timeout_sec=60, - err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) - if len(self.old_processors) > 0: - log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) else: - log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Setting subscription metadata to leaders supported version 6 and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) - first_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.", + first_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.", timeout_sec=60, - err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(first_other_node.account)) - second_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.", + err_msg="Never saw output 'Upgrade metadata to version 6' on" + str(first_other_node.account)) + second_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.", timeout_sec=60, - err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(second_other_node.account)) + err_msg="Never saw output 'Upgrade metadata to version 6' on" + str(second_other_node.account)) log_monitor.wait_until("Version probing detected. Triggering new rebalance.", timeout_sec=60,