diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 36196a68d..d2f2b3a8e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -65,6 +65,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf private final static int VERSION_TWO = 2; private final static int VERSION_THREE = 3; private final static int VERSION_FOUR = 4; + private final static int VERSION_FIVE = 5; private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE; protected final Set supportedVersions = new HashSet<>(); @@ -446,7 +447,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf for (final String topic : topicsInfo.sourceTopics) { if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) && !metadata.topics().contains(topic)) { - log.error("Missing source topic {} durign assignment. Returning error {}.", + log.error("Missing source topic {} during assignment. Returning error {}.", topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name()); return new GroupAssignment(errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code)); } @@ -625,6 +626,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf for (final Map.Entry entry : clientMetadataMap.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; + // if application server is configured, also include host state map if (hostInfo != null) { final Set topicPartitions = new HashSet<>(); final ClientState state = entry.getValue().state; @@ -775,6 +777,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf return taskIdsForConsumerAssignment; } + private void upgradeSubscriptionVersionIfNeeded(final int leaderSupportedVersion) { + if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { + log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + + "Upgrading subscription metadata version to {} for next rebalance.", + usedSubscriptionMetadataVersion, + leaderSupportedVersion, + leaderSupportedVersion); + usedSubscriptionMetadataVersion = leaderSupportedVersion; + } + } + /** * @throws TaskAssignmentException if there is no task id for one of the partitions specified */ @@ -835,29 +848,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf partitionsByHost = info.partitionsByHost(); break; case VERSION_THREE: - if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { - log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + - "Upgrading subscription metadata version to {} for next rebalance.", - usedSubscriptionMetadataVersion, - leaderSupportedVersion, - leaderSupportedVersion); - usedSubscriptionMetadataVersion = leaderSupportedVersion; - } + upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion); processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; case VERSION_FOUR: - if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { - log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + - "Upgrading subscription metadata version to {} for next rebalance.", - usedSubscriptionMetadataVersion, - leaderSupportedVersion, - leaderSupportedVersion); - usedSubscriptionMetadataVersion = leaderSupportedVersion; - } + upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion); processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; + case VERSION_FIVE: + upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion); + processVersionFiveAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + break; default: throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/"); } @@ -918,6 +922,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); } + private void processVersionFiveAssignment(final AssignmentInfo info, + final List partitions, + final Map> activeTasks, + final Map topicToPartitionInfo) { + processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo); + } + // for testing protected void processLatestVersionAssignment(final AssignmentInfo info, final List partitions, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index 8ad4036f6..4b2dae260 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -41,7 +41,7 @@ public class AssignmentInfo { private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 4; + public static final int LATEST_SUPPORTED_VERSION = 5; static final int UNKNOWN = -1; private final int usedVersion; @@ -59,12 +59,14 @@ public class AssignmentInfo { this.errCode = 0; } + // for testing only public AssignmentInfo(final List activeTasks, final Map> standbyTasks, - final Map> hostState) { - this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 0); + final Map> partitionsByHost) { + this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, 0); } + // creates an empty assignment public AssignmentInfo() { this(LATEST_SUPPORTED_VERSION, Collections.emptyList(), @@ -76,9 +78,9 @@ public class AssignmentInfo { public AssignmentInfo(final int version, final List activeTasks, final Map> standbyTasks, - final Map> hostState, + final Map> partitionsByHost, final int errCode) { - this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, errCode); + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, errCode); if (version < 1 || version > LATEST_SUPPORTED_VERSION) { throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + "; was: " + version); @@ -90,13 +92,13 @@ public class AssignmentInfo { final int latestSupportedVersion, final List activeTasks, final Map> standbyTasks, - final Map> hostState, + final Map> partitionsByHost, final int errCode) { this.usedVersion = version; this.latestSupportedVersion = latestSupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; - this.partitionsByHost = hostState; + this.partitionsByHost = partitionsByHost; this.errCode = errCode; } @@ -145,6 +147,9 @@ public class AssignmentInfo { case 4: encodeVersionFour(out); break; + case 5: + encodeVersionFive(out); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -192,13 +197,50 @@ public class AssignmentInfo { // encode partitions by host out.writeInt(partitionsByHost.size()); for (final Map.Entry> entry : partitionsByHost.entrySet()) { - final HostInfo hostInfo = entry.getKey(); - out.writeUTF(hostInfo.host()); - out.writeInt(hostInfo.port()); + writeHostInfo(out, entry.getKey()); writeTopicPartitions(out, entry.getValue()); } } + private void encodePartitionsByHostAsDictionary(final DataOutputStream out) throws IOException { + + // Build a dictionary to encode topicNames + int topicIndex = 0; + final Map topicNameDict = new HashMap<>(); + for (final Map.Entry> entry : partitionsByHost.entrySet()) { + for (final TopicPartition topicPartition : entry.getValue()) { + if (!topicNameDict.containsKey(topicPartition.topic())) { + topicNameDict.put(topicPartition.topic(), topicIndex++); + } + } + } + + // write the topic name dictionary out + out.writeInt(topicNameDict.size()); + for (final Map.Entry entry : topicNameDict.entrySet()) { + out.writeInt(entry.getValue()); + out.writeUTF(entry.getKey()); + } + + // encode partitions by host + out.writeInt(partitionsByHost.size()); + + // Write the topic index, partition + for (final Map.Entry> entry : partitionsByHost.entrySet()) { + writeHostInfo(out, entry.getKey()); + out.writeInt(entry.getValue().size()); + for (final TopicPartition partition : entry.getValue()) { + out.writeInt(topicNameDict.get(partition.topic())); + out.writeInt(partition.partition()); + } + } + } + + private void writeHostInfo(final DataOutputStream out, final HostInfo hostInfo) throws IOException { + out.writeUTF(hostInfo.host()); + out.writeInt(hostInfo.port()); + } + private void writeTopicPartitions(final DataOutputStream out, final Set partitions) throws IOException { out.writeInt(partitions.size()); @@ -223,6 +265,14 @@ public class AssignmentInfo { out.writeInt(errCode); } + private void encodeVersionFive(final DataOutputStream out) throws IOException { + out.writeInt(5); + out.writeInt(LATEST_SUPPORTED_VERSION); + encodeActiveAndStandbyTaskAssignment(out); + encodePartitionsByHostAsDictionary(out); + out.writeInt(errCode); + } + /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ @@ -254,6 +304,11 @@ public class AssignmentInfo { assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); decodeVersionFourData(assignmentInfo, in); break; + case 5: + latestSupportedVersion = in.readInt(); + assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); + decodeVersionFiveData(assignmentInfo, in); + break; default: final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -297,10 +352,10 @@ public class AssignmentInfo { final DataInputStream in) throws IOException { decodeActiveTasks(assignmentInfo, in); decodeStandbyTasks(assignmentInfo, in); - decodeGlobalAssignmentData(assignmentInfo, in); + decodePartitionsByHost(assignmentInfo, in); } - private static void decodeGlobalAssignmentData(final AssignmentInfo assignmentInfo, + private static void decodePartitionsByHost(final AssignmentInfo assignmentInfo, final DataInputStream in) throws IOException { assignmentInfo.partitionsByHost = new HashMap<>(); final int numEntries = in.readInt(); @@ -319,11 +374,37 @@ public class AssignmentInfo { return partitions; } + private static void decodePartitionsByHostUsingDictionary(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + assignmentInfo.partitionsByHost = new HashMap<>(); + final int dictSize = in.readInt(); + final Map topicIndexDict = new HashMap<>(dictSize); + for (int i = 0; i < dictSize; i++) { + topicIndexDict.put(in.readInt(), in.readUTF()); + } + + final int numEntries = in.readInt(); + for (int i = 0; i < numEntries; i++) { + final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt()); + assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in, topicIndexDict)); + } + } + + private static Set readTopicPartitions(final DataInputStream in, + final Map topicIndexDict) throws IOException { + final int numPartitions = in.readInt(); + final Set partitions = new HashSet<>(numPartitions); + for (int j = 0; j < numPartitions; j++) { + partitions.add(new TopicPartition(topicIndexDict.get(in.readInt()), in.readInt())); + } + return partitions; + } + private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo, final DataInputStream in) throws IOException { decodeActiveTasks(assignmentInfo, in); decodeStandbyTasks(assignmentInfo, in); - decodeGlobalAssignmentData(assignmentInfo, in); + decodePartitionsByHost(assignmentInfo, in); } private static void decodeVersionFourData(final AssignmentInfo assignmentInfo, @@ -332,6 +413,14 @@ public class AssignmentInfo { assignmentInfo.errCode = in.readInt(); } + private static void decodeVersionFiveData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeActiveTasks(assignmentInfo, in); + decodeStandbyTasks(assignmentInfo, in); + decodePartitionsByHostUsingDictionary(assignmentInfo, in); + assignmentInfo.errCode = in.readInt(); + } + @Override public int hashCode() { return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() @@ -359,7 +448,6 @@ public class AssignmentInfo { + ", supported version=" + latestSupportedVersion + ", active tasks=" + activeTasks + ", standby tasks=" + standbyTasks - + ", global assignment=" + partitionsByHost + "]"; + + ", partitions by host=" + partitionsByHost + "]"; } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 03b9e2d06..9161527fc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -32,7 +32,7 @@ public class SubscriptionInfo { private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 4; + public static final int LATEST_SUPPORTED_VERSION = 5; static final int UNKNOWN = -1; private final int usedVersion; @@ -127,6 +127,9 @@ public class SubscriptionInfo { case 4: buf = encodeVersionFour(); break; + case 5: + buf = encodeVersionFive(); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -205,12 +208,11 @@ public class SubscriptionInfo { } } - private ByteBuffer encodeVersionThree() { + private ByteBuffer encodeVersionThreeFourAndFive(final int usedVersion) { final byte[] endPointBytes = prepareUserEndPoint(); + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeFourAndFiveByteLength(endPointBytes)); - final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes)); - - buf.putInt(3); // used version + buf.putInt(usedVersion); // used version buf.putInt(LATEST_SUPPORTED_VERSION); // supported version encodeClientUUID(buf); encodeTasks(buf, prevTasks); @@ -220,22 +222,19 @@ public class SubscriptionInfo { return buf; } + private ByteBuffer encodeVersionThree() { + return encodeVersionThreeFourAndFive(3); + } + private ByteBuffer encodeVersionFour() { - final byte[] endPointBytes = prepareUserEndPoint(); - - final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes)); - - buf.putInt(4); // used version - buf.putInt(LATEST_SUPPORTED_VERSION); // supported version - encodeClientUUID(buf); - encodeTasks(buf, prevTasks); - encodeTasks(buf, standbyTasks); - encodeUserEndPoint(buf, endPointBytes); - - return buf; + return encodeVersionThreeFourAndFive(4); } - protected int getVersionThreeAndFourByteLength(final byte[] endPointBytes) { + private ByteBuffer encodeVersionFive() { + return encodeVersionThreeFourAndFive(5); + } + + protected int getVersionThreeFourAndFiveByteLength(final byte[] endPointBytes) { return 4 + // used version 4 + // latest supported version version 16 + // client ID @@ -266,6 +265,7 @@ public class SubscriptionInfo { break; case 3: case 4: + case 5: latestSupportedVersion = data.getInt(); subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion); decodeVersionThreeData(subscriptionInfo, data); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index 8b990659d..0551e1d3e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -94,4 +94,11 @@ public class AssignmentInfoTest { final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } + + @Test + public void shouldEncodeAndDecodeVersion5() { + final AssignmentInfo info = new AssignmentInfo(5, activeTasks, standbyTasks, globalAssignment, 2); + final AssignmentInfo expectedInfo = new AssignmentInfo(5, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 0b2d0b319..50a0a1148 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -271,7 +271,7 @@ public class StreamsUpgradeTest { private ByteBuffer encodeFutureVersion() { final byte[] endPointBytes = prepareUserEndPoint(); - final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes)); + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeFourAndFiveByteLength(endPointBytes)); buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index d382c9700..fb8573281 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -571,24 +571,24 @@ class StreamsUpgradeTest(Test): monitors[first_other_processor] = first_other_monitor monitors[second_other_processor] = second_other_monitor - leader_monitor.wait_until("Received a future (version probing) subscription (version: 5). Sending empty assignment back (with supported version 4).", + leader_monitor.wait_until("Received a future (version probing) subscription (version: 6). Sending empty assignment back (with supported version 5).", timeout_sec=60, err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) if len(self.old_processors) > 0: - log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) else: - log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Setting subscription metadata to leaders supported version 5 and trigger new rebalance.", + log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Setting subscription metadata to leaders supported version 6 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) - first_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.", + first_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.", timeout_sec=60, - err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account)) - second_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.", + err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(first_other_node.account)) + second_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.", timeout_sec=60, - err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account)) + err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(second_other_node.account)) log_monitor.wait_until("Version probing detected. Triggering new rebalance.", timeout_sec=60,