KAFKA-7149 : Reducing streams assignment data size (#7185)

* Leader instance uses dictionary encoding on the wire to send topic partitions
* Topic names (most expensive component) are mapped to an integer using the dictionary
* Follower instances receive the dictionary, decode topic names back
* Purely an on-the-wire optimization, no in-memory structures changed
* Test case added for version 5 AssignmentInfo

Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
vinoth chandar 2019-09-05 13:50:55 -07:00 коммит произвёл Guozhang Wang
Родитель deac5d93ce
Коммит ffef0871c2
6 изменённых файлов: 164 добавлений и 58 удалений

Просмотреть файл

@ -65,6 +65,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private final static int VERSION_TWO = 2;
private final static int VERSION_THREE = 3;
private final static int VERSION_FOUR = 4;
private final static int VERSION_FIVE = 5;
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
protected final Set<Integer> supportedVersions = new HashSet<>();
@ -446,7 +447,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
for (final String topic : topicsInfo.sourceTopics) {
if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
log.error("Missing source topic {} durign assignment. Returning error {}.",
log.error("Missing source topic {} during assignment. Returning error {}.",
topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
return new GroupAssignment(errorAssignment(clientMetadataMap, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code));
}
@ -625,6 +626,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
for (final Map.Entry<UUID, ClientMetadata> entry : clientMetadataMap.entrySet()) {
final HostInfo hostInfo = entry.getValue().hostInfo;
// if application server is configured, also include host state map
if (hostInfo != null) {
final Set<TopicPartition> topicPartitions = new HashSet<>();
final ClientState state = entry.getValue().state;
@ -775,6 +777,17 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
return taskIdsForConsumerAssignment;
}
private void upgradeSubscriptionVersionIfNeeded(final int leaderSupportedVersion) {
if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
log.info("Sent a version {} subscription and group leader's latest supported version is {}. " +
"Upgrading subscription metadata version to {} for next rebalance.",
usedSubscriptionMetadataVersion,
leaderSupportedVersion,
leaderSupportedVersion);
usedSubscriptionMetadataVersion = leaderSupportedVersion;
}
}
/**
* @throws TaskAssignmentException if there is no task id for one of the partitions specified
*/
@ -835,29 +848,20 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
partitionsByHost = info.partitionsByHost();
break;
case VERSION_THREE:
if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
log.info("Sent a version {} subscription and group leader's latest supported version is {}. " +
"Upgrading subscription metadata version to {} for next rebalance.",
usedSubscriptionMetadataVersion,
leaderSupportedVersion,
leaderSupportedVersion);
usedSubscriptionMetadataVersion = leaderSupportedVersion;
}
upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion);
processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
case VERSION_FOUR:
if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
log.info("Sent a version {} subscription and group leader's latest supported version is {}. " +
"Upgrading subscription metadata version to {} for next rebalance.",
usedSubscriptionMetadataVersion,
leaderSupportedVersion,
leaderSupportedVersion);
usedSubscriptionMetadataVersion = leaderSupportedVersion;
}
upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion);
processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
case VERSION_FIVE:
upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion);
processVersionFiveAssignment(info, partitions, activeTasks, topicToPartitionInfo);
partitionsByHost = info.partitionsByHost();
break;
default:
throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
}
@ -918,6 +922,13 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo);
}
private void processVersionFiveAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,
final Map<TaskId, Set<TopicPartition>> activeTasks,
final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) {
processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo);
}
// for testing
protected void processLatestVersionAssignment(final AssignmentInfo info,
final List<TopicPartition> partitions,

Просмотреть файл

@ -41,7 +41,7 @@ public class AssignmentInfo {
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
public static final int LATEST_SUPPORTED_VERSION = 4;
public static final int LATEST_SUPPORTED_VERSION = 5;
static final int UNKNOWN = -1;
private final int usedVersion;
@ -59,12 +59,14 @@ public class AssignmentInfo {
this.errCode = 0;
}
// for testing only
public AssignmentInfo(final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState) {
this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 0);
final Map<HostInfo, Set<TopicPartition>> partitionsByHost) {
this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, 0);
}
// creates an empty assignment
public AssignmentInfo() {
this(LATEST_SUPPORTED_VERSION,
Collections.emptyList(),
@ -76,9 +78,9 @@ public class AssignmentInfo {
public AssignmentInfo(final int version,
final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState,
final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
final int errCode) {
this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, errCode);
this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, errCode);
if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
+ "; was: " + version);
@ -90,13 +92,13 @@ public class AssignmentInfo {
final int latestSupportedVersion,
final List<TaskId> activeTasks,
final Map<TaskId, Set<TopicPartition>> standbyTasks,
final Map<HostInfo, Set<TopicPartition>> hostState,
final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
final int errCode) {
this.usedVersion = version;
this.latestSupportedVersion = latestSupportedVersion;
this.activeTasks = activeTasks;
this.standbyTasks = standbyTasks;
this.partitionsByHost = hostState;
this.partitionsByHost = partitionsByHost;
this.errCode = errCode;
}
@ -145,6 +147,9 @@ public class AssignmentInfo {
case 4:
encodeVersionFour(out);
break;
case 5:
encodeVersionFive(out);
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@ -192,13 +197,50 @@ public class AssignmentInfo {
// encode partitions by host
out.writeInt(partitionsByHost.size());
for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
final HostInfo hostInfo = entry.getKey();
out.writeUTF(hostInfo.host());
out.writeInt(hostInfo.port());
writeHostInfo(out, entry.getKey());
writeTopicPartitions(out, entry.getValue());
}
}
private void encodePartitionsByHostAsDictionary(final DataOutputStream out) throws IOException {
// Build a dictionary to encode topicNames
int topicIndex = 0;
final Map<String, Integer> topicNameDict = new HashMap<>();
for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
for (final TopicPartition topicPartition : entry.getValue()) {
if (!topicNameDict.containsKey(topicPartition.topic())) {
topicNameDict.put(topicPartition.topic(), topicIndex++);
}
}
}
// write the topic name dictionary out
out.writeInt(topicNameDict.size());
for (final Map.Entry<String, Integer> entry : topicNameDict.entrySet()) {
out.writeInt(entry.getValue());
out.writeUTF(entry.getKey());
}
// encode partitions by host
out.writeInt(partitionsByHost.size());
// Write the topic index, partition
for (final Map.Entry<HostInfo, Set<TopicPartition>> entry : partitionsByHost.entrySet()) {
writeHostInfo(out, entry.getKey());
out.writeInt(entry.getValue().size());
for (final TopicPartition partition : entry.getValue()) {
out.writeInt(topicNameDict.get(partition.topic()));
out.writeInt(partition.partition());
}
}
}
private void writeHostInfo(final DataOutputStream out, final HostInfo hostInfo) throws IOException {
out.writeUTF(hostInfo.host());
out.writeInt(hostInfo.port());
}
private void writeTopicPartitions(final DataOutputStream out,
final Set<TopicPartition> partitions) throws IOException {
out.writeInt(partitions.size());
@ -223,6 +265,14 @@ public class AssignmentInfo {
out.writeInt(errCode);
}
private void encodeVersionFive(final DataOutputStream out) throws IOException {
out.writeInt(5);
out.writeInt(LATEST_SUPPORTED_VERSION);
encodeActiveAndStandbyTaskAssignment(out);
encodePartitionsByHostAsDictionary(out);
out.writeInt(errCode);
}
/**
* @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown
*/
@ -254,6 +304,11 @@ public class AssignmentInfo {
assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
decodeVersionFourData(assignmentInfo, in);
break;
case 5:
latestSupportedVersion = in.readInt();
assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion);
decodeVersionFiveData(assignmentInfo, in);
break;
default:
final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " +
"used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@ -297,10 +352,10 @@ public class AssignmentInfo {
final DataInputStream in) throws IOException {
decodeActiveTasks(assignmentInfo, in);
decodeStandbyTasks(assignmentInfo, in);
decodeGlobalAssignmentData(assignmentInfo, in);
decodePartitionsByHost(assignmentInfo, in);
}
private static void decodeGlobalAssignmentData(final AssignmentInfo assignmentInfo,
private static void decodePartitionsByHost(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
assignmentInfo.partitionsByHost = new HashMap<>();
final int numEntries = in.readInt();
@ -319,11 +374,37 @@ public class AssignmentInfo {
return partitions;
}
private static void decodePartitionsByHostUsingDictionary(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
assignmentInfo.partitionsByHost = new HashMap<>();
final int dictSize = in.readInt();
final Map<Integer, String> topicIndexDict = new HashMap<>(dictSize);
for (int i = 0; i < dictSize; i++) {
topicIndexDict.put(in.readInt(), in.readUTF());
}
final int numEntries = in.readInt();
for (int i = 0; i < numEntries; i++) {
final HostInfo hostInfo = new HostInfo(in.readUTF(), in.readInt());
assignmentInfo.partitionsByHost.put(hostInfo, readTopicPartitions(in, topicIndexDict));
}
}
private static Set<TopicPartition> readTopicPartitions(final DataInputStream in,
final Map<Integer, String> topicIndexDict) throws IOException {
final int numPartitions = in.readInt();
final Set<TopicPartition> partitions = new HashSet<>(numPartitions);
for (int j = 0; j < numPartitions; j++) {
partitions.add(new TopicPartition(topicIndexDict.get(in.readInt()), in.readInt()));
}
return partitions;
}
private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
decodeActiveTasks(assignmentInfo, in);
decodeStandbyTasks(assignmentInfo, in);
decodeGlobalAssignmentData(assignmentInfo, in);
decodePartitionsByHost(assignmentInfo, in);
}
private static void decodeVersionFourData(final AssignmentInfo assignmentInfo,
@ -332,6 +413,14 @@ public class AssignmentInfo {
assignmentInfo.errCode = in.readInt();
}
private static void decodeVersionFiveData(final AssignmentInfo assignmentInfo,
final DataInputStream in) throws IOException {
decodeActiveTasks(assignmentInfo, in);
decodeStandbyTasks(assignmentInfo, in);
decodePartitionsByHostUsingDictionary(assignmentInfo, in);
assignmentInfo.errCode = in.readInt();
}
@Override
public int hashCode() {
return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode()
@ -359,7 +448,6 @@ public class AssignmentInfo {
+ ", supported version=" + latestSupportedVersion
+ ", active tasks=" + activeTasks
+ ", standby tasks=" + standbyTasks
+ ", global assignment=" + partitionsByHost + "]";
+ ", partitions by host=" + partitionsByHost + "]";
}
}

Просмотреть файл

@ -32,7 +32,7 @@ public class SubscriptionInfo {
private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
public static final int LATEST_SUPPORTED_VERSION = 4;
public static final int LATEST_SUPPORTED_VERSION = 5;
static final int UNKNOWN = -1;
private final int usedVersion;
@ -127,6 +127,9 @@ public class SubscriptionInfo {
case 4:
buf = encodeVersionFour();
break;
case 5:
buf = encodeVersionFive();
break;
default:
throw new IllegalStateException("Unknown metadata version: " + usedVersion
+ "; latest supported version: " + LATEST_SUPPORTED_VERSION);
@ -205,12 +208,11 @@ public class SubscriptionInfo {
}
}
private ByteBuffer encodeVersionThree() {
private ByteBuffer encodeVersionThreeFourAndFive(final int usedVersion) {
final byte[] endPointBytes = prepareUserEndPoint();
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeFourAndFiveByteLength(endPointBytes));
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
buf.putInt(3); // used version
buf.putInt(usedVersion); // used version
buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
@ -220,22 +222,19 @@ public class SubscriptionInfo {
return buf;
}
private ByteBuffer encodeVersionThree() {
return encodeVersionThreeFourAndFive(3);
}
private ByteBuffer encodeVersionFour() {
final byte[] endPointBytes = prepareUserEndPoint();
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
buf.putInt(4); // used version
buf.putInt(LATEST_SUPPORTED_VERSION); // supported version
encodeClientUUID(buf);
encodeTasks(buf, prevTasks);
encodeTasks(buf, standbyTasks);
encodeUserEndPoint(buf, endPointBytes);
return buf;
return encodeVersionThreeFourAndFive(4);
}
protected int getVersionThreeAndFourByteLength(final byte[] endPointBytes) {
private ByteBuffer encodeVersionFive() {
return encodeVersionThreeFourAndFive(5);
}
protected int getVersionThreeFourAndFiveByteLength(final byte[] endPointBytes) {
return 4 + // used version
4 + // latest supported version version
16 + // client ID
@ -266,6 +265,7 @@ public class SubscriptionInfo {
break;
case 3:
case 4:
case 5:
latestSupportedVersion = data.getInt();
subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion);
decodeVersionThreeData(subscriptionInfo, data);

Просмотреть файл

@ -94,4 +94,11 @@ public class AssignmentInfoTest {
final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2);
assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
@Test
public void shouldEncodeAndDecodeVersion5() {
final AssignmentInfo info = new AssignmentInfo(5, activeTasks, standbyTasks, globalAssignment, 2);
final AssignmentInfo expectedInfo = new AssignmentInfo(5, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2);
assertEquals(expectedInfo, AssignmentInfo.decode(info.encode()));
}
}

Просмотреть файл

@ -271,7 +271,7 @@ public class StreamsUpgradeTest {
private ByteBuffer encodeFutureVersion() {
final byte[] endPointBytes = prepareUserEndPoint();
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes));
final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeFourAndFiveByteLength(endPointBytes));
buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version
buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version

Просмотреть файл

@ -571,24 +571,24 @@ class StreamsUpgradeTest(Test):
monitors[first_other_processor] = first_other_monitor
monitors[second_other_processor] = second_other_monitor
leader_monitor.wait_until("Received a future (version probing) subscription (version: 5). Sending empty assignment back (with supported version 4).",
leader_monitor.wait_until("Received a future (version probing) subscription (version: 6). Sending empty assignment back (with supported version 5).",
timeout_sec=60,
err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account))
if len(self.old_processors) > 0:
log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.",
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
else:
log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Setting subscription metadata to leaders supported version 5 and trigger new rebalance.",
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Setting subscription metadata to leaders supported version 6 and trigger new rebalance.",
timeout_sec=60,
err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
first_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.",
first_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account))
second_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.",
err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(first_other_node.account))
second_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.",
timeout_sec=60,
err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account))
err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(second_other_node.account))
log_monitor.wait_until("Version probing detected. Triggering new rebalance.",
timeout_sec=60,