зеркало из https://github.com/microsoft/kafka.git
KAFKA-8649: Send latest commonly supported version in assignment (#7423)
Instead of sending the leader's version and having older members try to blindly upgrade. The only other real change here is that we will also set the VERSION_PROBING error code and return early from onAssignment when we are upgrading our used subscription version (not just downgrading it) since this implies the whole group has finished the rolling upgrade and all members should rejoin with the new subscription version. Also piggy-backing on a fix for a potentially dangerous edge case, where every thread of an instance is assigned the same set of active tasks. Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Родитель
dc7dd5f4fe
Коммит
8da69936a7
|
@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
|
|||
import org.apache.kafka.common.config.ConfigException;
|
||||
import org.apache.kafka.common.utils.LogContext;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.StreamsException;
|
||||
import org.apache.kafka.streams.errors.TaskAssignmentException;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
|
@ -135,8 +136,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
void addConsumer(final String consumerMemberId,
|
||||
final SubscriptionInfo info) {
|
||||
consumers.add(consumerMemberId);
|
||||
state.addPreviousActiveTasks(info.prevTasks());
|
||||
state.addPreviousStandbyTasks(info.standbyTasks());
|
||||
state.addPreviousActiveTasks(consumerMemberId, info.prevTasks());
|
||||
state.addPreviousStandbyTasks(consumerMemberId, info.standbyTasks());
|
||||
state.incrementCapacity();
|
||||
}
|
||||
|
||||
|
@ -286,6 +287,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
final Set<String> futureConsumers = new HashSet<>();
|
||||
|
||||
int minReceivedMetadataVersion = LATEST_SUPPORTED_VERSION;
|
||||
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
|
||||
|
||||
int futureMetadataVersion = UNKNOWN;
|
||||
for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
|
||||
|
@ -303,6 +305,11 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
minReceivedMetadataVersion = usedVersion;
|
||||
}
|
||||
|
||||
final int latestSupportedVersion = info.latestSupportedVersion();
|
||||
if (latestSupportedVersion < minSupportedMetadataVersion) {
|
||||
minSupportedMetadataVersion = latestSupportedVersion;
|
||||
}
|
||||
|
||||
// create the new client metadata if necessary
|
||||
ClientMetadata clientMetadata = clientMetadataMap.get(info.processId());
|
||||
|
||||
|
@ -572,14 +579,16 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
partitionsForTask,
|
||||
partitionsByHostState,
|
||||
futureConsumers,
|
||||
minReceivedMetadataVersion
|
||||
minReceivedMetadataVersion,
|
||||
minSupportedMetadataVersion
|
||||
);
|
||||
} else {
|
||||
assignment = computeNewAssignment(
|
||||
clientMetadataMap,
|
||||
partitionsForTask,
|
||||
partitionsByHostState,
|
||||
minReceivedMetadataVersion
|
||||
minReceivedMetadataVersion,
|
||||
minSupportedMetadataVersion
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -589,7 +598,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
private static Map<String, Assignment> computeNewAssignment(final Map<UUID, ClientMetadata> clientsMetadata,
|
||||
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
|
||||
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
|
||||
final int minUserMetadataVersion) {
|
||||
final int minUserMetadataVersion,
|
||||
final int minSupportedMetadataVersion) {
|
||||
final Map<String, Assignment> assignment = new HashMap<>();
|
||||
|
||||
// within the client, distribute tasks to its owned consumers
|
||||
|
@ -605,17 +615,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
int consumerTaskIndex = 0;
|
||||
|
||||
for (final String consumer : consumers) {
|
||||
final List<TaskId> activeTasks = interleavedActive.get(consumerTaskIndex);
|
||||
|
||||
// These will be filled in by buildAssignedActiveTaskAndPartitionsList below
|
||||
final List<TopicPartition> activePartitionsList = new ArrayList<>();
|
||||
final List<TaskId> assignedActiveList = new ArrayList<>();
|
||||
|
||||
buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask);
|
||||
|
||||
final Map<TaskId, Set<TopicPartition>> standby = new HashMap<>();
|
||||
final List<AssignedPartition> assignedPartitions = new ArrayList<>();
|
||||
|
||||
final List<TaskId> assignedActiveList = interleavedActive.get(consumerTaskIndex);
|
||||
|
||||
for (final TaskId taskId : assignedActiveList) {
|
||||
for (final TopicPartition partition : partitionsForTask.get(taskId)) {
|
||||
assignedPartitions.add(new AssignedPartition(taskId, partition));
|
||||
}
|
||||
}
|
||||
|
||||
if (!state.standbyTasks().isEmpty()) {
|
||||
final List<TaskId> assignedStandbyList = interleavedStandby.get(consumerTaskIndex);
|
||||
for (final TaskId taskId : assignedStandbyList) {
|
||||
|
@ -625,22 +633,15 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
|
||||
consumerTaskIndex++;
|
||||
|
||||
Collections.sort(assignedPartitions);
|
||||
final List<TaskId> active = new ArrayList<>();
|
||||
final List<TopicPartition> activePartitions = new ArrayList<>();
|
||||
for (final AssignedPartition partition : assignedPartitions) {
|
||||
active.add(partition.taskId);
|
||||
activePartitions.add(partition.partition);
|
||||
}
|
||||
|
||||
// finally, encode the assignment before sending back to coordinator
|
||||
assignment.put(
|
||||
consumer,
|
||||
new Assignment(
|
||||
activePartitions,
|
||||
activePartitionsList,
|
||||
new AssignmentInfo(
|
||||
minUserMetadataVersion,
|
||||
active,
|
||||
minSupportedMetadataVersion,
|
||||
assignedActiveList,
|
||||
standby,
|
||||
partitionsByHostState,
|
||||
0
|
||||
|
@ -657,7 +658,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
|
||||
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
|
||||
final Set<String> futureConsumers,
|
||||
final int minUserMetadataVersion) {
|
||||
final int minUserMetadataVersion,
|
||||
final int minSupportedMetadataVersion) {
|
||||
final Map<String, Assignment> assignment = new HashMap<>();
|
||||
|
||||
// assign previously assigned tasks to "old consumers"
|
||||
|
@ -668,23 +670,27 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
continue;
|
||||
}
|
||||
|
||||
final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasks());
|
||||
// Return the same active tasks that were claimed in the subscription
|
||||
final List<TaskId> activeTasks = new ArrayList<>(clientMetadata.state.prevActiveTasksForConsumer(consumerId));
|
||||
|
||||
final List<TopicPartition> assignedPartitions = new ArrayList<>();
|
||||
for (final TaskId taskId : activeTasks) {
|
||||
assignedPartitions.addAll(partitionsForTask.get(taskId));
|
||||
}
|
||||
// These will be filled in by buildAssignedActiveTaskAndPartitionsList below
|
||||
final List<TopicPartition> activePartitionsList = new ArrayList<>();
|
||||
final List<TaskId> assignedActiveList = new ArrayList<>();
|
||||
|
||||
buildAssignedActiveTaskAndPartitionsList(activeTasks, activePartitionsList, assignedActiveList, partitionsForTask);
|
||||
|
||||
// Return the same standby tasks that were claimed in the subscription
|
||||
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
|
||||
for (final TaskId taskId : clientMetadata.state.prevStandbyTasks()) {
|
||||
for (final TaskId taskId : clientMetadata.state.prevStandbyTasksForConsumer(consumerId)) {
|
||||
standbyTasks.put(taskId, partitionsForTask.get(taskId));
|
||||
}
|
||||
|
||||
assignment.put(consumerId, new Assignment(
|
||||
assignedPartitions,
|
||||
activePartitionsList,
|
||||
new AssignmentInfo(
|
||||
minUserMetadataVersion,
|
||||
activeTasks,
|
||||
minSupportedMetadataVersion,
|
||||
assignedActiveList,
|
||||
standbyTasks,
|
||||
partitionsByHostState,
|
||||
0)
|
||||
|
@ -697,13 +703,34 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
for (final String consumerId : futureConsumers) {
|
||||
assignment.put(consumerId, new Assignment(
|
||||
Collections.emptyList(),
|
||||
new AssignmentInfo().encode()
|
||||
new AssignmentInfo(minUserMetadataVersion, minSupportedMetadataVersion).encode()
|
||||
));
|
||||
}
|
||||
|
||||
return assignment;
|
||||
}
|
||||
|
||||
private static void buildAssignedActiveTaskAndPartitionsList(final List<TaskId> activeTasks,
|
||||
final List<TopicPartition> activePartitionsList,
|
||||
final List<TaskId> assignedActiveList,
|
||||
final Map<TaskId, Set<TopicPartition>> partitionsForTask) {
|
||||
final List<AssignedPartition> assignedPartitions = new ArrayList<>();
|
||||
|
||||
// Build up list of all assigned partition-task pairs
|
||||
for (final TaskId taskId : activeTasks) {
|
||||
for (final TopicPartition partition : partitionsForTask.get(taskId)) {
|
||||
assignedPartitions.add(new AssignedPartition(taskId, partition));
|
||||
}
|
||||
}
|
||||
|
||||
// Add one copy of a task for each corresponding partition, so the receiver can determine the task <-> tp mapping
|
||||
Collections.sort(assignedPartitions);
|
||||
for (final AssignedPartition partition : assignedPartitions) {
|
||||
assignedActiveList.add(partition.taskId);
|
||||
activePartitionsList.add(partition.partition);
|
||||
}
|
||||
}
|
||||
|
||||
// visible for testing
|
||||
static List<List<TaskId>> interleaveTasksByGroupId(final Collection<TaskId> taskIds, final int numberThreads) {
|
||||
final LinkedList<TaskId> sortedTasks = new LinkedList<>(taskIds);
|
||||
|
@ -724,15 +751,66 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
return taskIdsForConsumerAssignment;
|
||||
}
|
||||
|
||||
private void upgradeSubscriptionVersionIfNeeded(final int leaderSupportedVersion) {
|
||||
if (leaderSupportedVersion > usedSubscriptionMetadataVersion) {
|
||||
log.info("Sent a version {} subscription and group leader's latest supported version is {}. " +
|
||||
"Upgrading subscription metadata version to {} for next rebalance.",
|
||||
usedSubscriptionMetadataVersion,
|
||||
leaderSupportedVersion,
|
||||
leaderSupportedVersion);
|
||||
usedSubscriptionMetadataVersion = leaderSupportedVersion;
|
||||
private void validateMetadataVersions(final int receivedAssignmentMetadataVersion,
|
||||
final int latestCommonlySupportedVersion) {
|
||||
|
||||
if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) {
|
||||
log.error("Leader sent back an assignment with version {} which was greater than our used version {}",
|
||||
receivedAssignmentMetadataVersion, usedSubscriptionMetadataVersion);
|
||||
throw new TaskAssignmentException(
|
||||
"Sent a version " + usedSubscriptionMetadataVersion
|
||||
+ " subscription but got an assignment with higher version "
|
||||
+ receivedAssignmentMetadataVersion + "."
|
||||
);
|
||||
}
|
||||
|
||||
if (latestCommonlySupportedVersion > LATEST_SUPPORTED_VERSION) {
|
||||
log.error("Leader sent back assignment with commonly supported version {} that is greater than our "
|
||||
+ "actual latest supported version {}", latestCommonlySupportedVersion, LATEST_SUPPORTED_VERSION);
|
||||
throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
|
||||
}
|
||||
|
||||
if (receivedAssignmentMetadataVersion < EARLIEST_PROBEABLE_VERSION) {
|
||||
log.error("Leader sent back assignment with version {} which is less than the earliest probeable version {}. " +
|
||||
"To do a rolling upgrade from a version earlier than {} you must set the {} config",
|
||||
receivedAssignmentMetadataVersion, EARLIEST_PROBEABLE_VERSION, EARLIEST_PROBEABLE_VERSION,
|
||||
StreamsConfig.UPGRADE_FROM_CONFIG);
|
||||
throw new IllegalStateException("Can't use version probing with versions less than " + EARLIEST_PROBEABLE_VERSION);
|
||||
}
|
||||
}
|
||||
|
||||
// Returns true if subscription version was changed, indicating version probing and need to rebalance again
|
||||
protected boolean maybeUpdateSubscriptionVersion(final int receivedAssignmentMetadataVersion,
|
||||
final int latestCommonlySupportedVersion) {
|
||||
// If the latest commonly supported version is now greater than our used version, this indicates we have just
|
||||
// completed the rolling upgrade and can now update our subscription version for the final rebalance
|
||||
if (latestCommonlySupportedVersion > usedSubscriptionMetadataVersion) {
|
||||
log.info(
|
||||
"Sent a version {} subscription and group's latest commonly supported version is {} (successful " +
|
||||
"version probing and end of rolling upgrade). Upgrading subscription metadata version to " +
|
||||
"{} for next rebalance.",
|
||||
usedSubscriptionMetadataVersion,
|
||||
latestCommonlySupportedVersion,
|
||||
latestCommonlySupportedVersion
|
||||
);
|
||||
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
|
||||
return true;
|
||||
}
|
||||
|
||||
// If we received a lower version than we sent, someone else in the group still hasn't upgraded. We
|
||||
// should downgrade our subscription until everyone is on the latest version
|
||||
if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion) {
|
||||
log.info(
|
||||
"Sent a version {} subscription and got version {} assignment back (successful version probing). " +
|
||||
"Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
|
||||
usedSubscriptionMetadataVersion,
|
||||
receivedAssignmentMetadataVersion
|
||||
);
|
||||
usedSubscriptionMetadataVersion = latestCommonlySupportedVersion;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -746,43 +824,27 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
|
||||
if (info.errCode() != AssignorError.NONE.code()) {
|
||||
// set flag to shutdown streams app
|
||||
assignmentErrorCode.set(info.errCode());
|
||||
setAssignmentErrorCode(info.errCode());
|
||||
return;
|
||||
}
|
||||
/*
|
||||
* latestCommonlySupportedVersion belongs to [usedSubscriptionMetadataVersion, LATEST_SUPPORTED_VERSION]
|
||||
* receivedAssignmentMetadataVersion belongs to [EARLIEST_PROBEABLE_VERSION, usedSubscriptionMetadataVersion]
|
||||
*
|
||||
* usedSubscriptionMetadataVersion will be downgraded to receivedAssignmentMetadataVersion during a rolling
|
||||
* bounce upgrade with version probing.
|
||||
*
|
||||
* usedSubscriptionMetadataVersion will be upgraded to latestCommonlySupportedVersion when all members have
|
||||
* been bounced and it is safe to use the latest version.
|
||||
*/
|
||||
final int receivedAssignmentMetadataVersion = info.version();
|
||||
final int leaderSupportedVersion = info.latestSupportedVersion();
|
||||
final int latestCommonlySupportedVersion = info.commonlySupportedVersion();
|
||||
|
||||
if (receivedAssignmentMetadataVersion > usedSubscriptionMetadataVersion) {
|
||||
throw new IllegalStateException(
|
||||
"Sent a version " + usedSubscriptionMetadataVersion
|
||||
+ " subscription but got an assignment with higher version "
|
||||
+ receivedAssignmentMetadataVersion + "."
|
||||
);
|
||||
}
|
||||
validateMetadataVersions(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion);
|
||||
|
||||
if (receivedAssignmentMetadataVersion < usedSubscriptionMetadataVersion
|
||||
&& receivedAssignmentMetadataVersion >= EARLIEST_PROBEABLE_VERSION) {
|
||||
|
||||
if (receivedAssignmentMetadataVersion == leaderSupportedVersion) {
|
||||
log.info(
|
||||
"Sent a version {} subscription and got version {} assignment back (successful version probing). " +
|
||||
"Downgrading subscription metadata to received version and trigger new rebalance.",
|
||||
usedSubscriptionMetadataVersion,
|
||||
receivedAssignmentMetadataVersion
|
||||
);
|
||||
usedSubscriptionMetadataVersion = receivedAssignmentMetadataVersion;
|
||||
} else {
|
||||
log.info(
|
||||
"Sent a version {} subscription and got version {} assignment back (successful version probing). " +
|
||||
"Setting subscription metadata to leaders supported version {} and trigger new rebalance.",
|
||||
usedSubscriptionMetadataVersion,
|
||||
receivedAssignmentMetadataVersion,
|
||||
leaderSupportedVersion
|
||||
);
|
||||
usedSubscriptionMetadataVersion = leaderSupportedVersion;
|
||||
}
|
||||
|
||||
assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
|
||||
// Check if this was a version probing rebalance and check the error code to trigger another rebalance if so
|
||||
if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
|
||||
setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -800,13 +862,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
partitionsByHost = Collections.emptyMap();
|
||||
break;
|
||||
case VERSION_TWO:
|
||||
processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
|
||||
partitionsByHost = info.partitionsByHost();
|
||||
break;
|
||||
case VERSION_THREE:
|
||||
case VERSION_FOUR:
|
||||
case VERSION_FIVE:
|
||||
upgradeSubscriptionVersionIfNeeded(leaderSupportedVersion);
|
||||
processVersionTwoAssignment(logPrefix, info, partitions, activeTasks, topicToPartitionInfo, partitionsToTaskId);
|
||||
partitionsByHost = info.partitionsByHost();
|
||||
break;
|
||||
|
@ -915,6 +973,10 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
|
|||
}
|
||||
}
|
||||
|
||||
protected void setAssignmentErrorCode(final Integer errorCode) {
|
||||
assignmentErrorCode.set(errorCode);
|
||||
}
|
||||
|
||||
|
||||
// following functions are for test only
|
||||
void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
|
||||
|
|
|
@ -72,9 +72,11 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
|
|||
);
|
||||
} else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code()) {
|
||||
log.debug(
|
||||
"Encountered assignment error during partition assignment: {}. Skipping task initialization",
|
||||
"Encountered assignment error during partition assignment: {}. Skipping task initialization and "
|
||||
+ "pausing any partitions we may have been assigned.",
|
||||
streamThread.getAssignmentErrorCode()
|
||||
);
|
||||
taskManager.pausePartitions();
|
||||
} else {
|
||||
// Close non-reassigned tasks before initializing new ones as we may have suspended active
|
||||
// tasks that become standbys or vice versa
|
||||
|
|
|
@ -129,8 +129,7 @@ public class TaskManager {
|
|||
}
|
||||
|
||||
// Pause all the new partitions until the underlying state store is ready for all the active tasks.
|
||||
log.trace("Pausing partitions: {}", assignment);
|
||||
consumer.pause(assignment);
|
||||
pausePartitions();
|
||||
}
|
||||
|
||||
private void resumeSuspended(final Collection<TopicPartition> assignment) {
|
||||
|
@ -366,6 +365,11 @@ public class TaskManager {
|
|||
return taskCreator.builder();
|
||||
}
|
||||
|
||||
void pausePartitions() {
|
||||
log.trace("Pausing partitions: {}", consumer.assignment());
|
||||
consumer.pause(consumer.assignment());
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws IllegalStateException If store gets registered after initialized is already finished
|
||||
* @throws StreamsException if the store's change log does not contain the partition
|
||||
|
|
|
@ -44,18 +44,21 @@ public class AssignmentInfo {
|
|||
private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class);
|
||||
|
||||
private final int usedVersion;
|
||||
private final int latestSupportedVersion;
|
||||
private final int commonlySupportedVersion;
|
||||
private int errCode;
|
||||
private List<TaskId> activeTasks;
|
||||
private Map<TaskId, Set<TopicPartition>> standbyTasks;
|
||||
private Map<HostInfo, Set<TopicPartition>> partitionsByHost;
|
||||
|
||||
// used for decoding; don't apply version checks
|
||||
private AssignmentInfo(final int version,
|
||||
final int latestSupportedVersion) {
|
||||
this.usedVersion = version;
|
||||
this.latestSupportedVersion = latestSupportedVersion;
|
||||
this.errCode = 0;
|
||||
// used for decoding and "future consumer" assignments during version probing
|
||||
public AssignmentInfo(final int version,
|
||||
final int commonlySupportedVersion) {
|
||||
this(version,
|
||||
commonlySupportedVersion,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
0);
|
||||
}
|
||||
|
||||
// for testing only
|
||||
|
@ -65,40 +68,31 @@ public class AssignmentInfo {
|
|||
this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, 0);
|
||||
}
|
||||
|
||||
// creates an empty assignment
|
||||
public AssignmentInfo() {
|
||||
this(LATEST_SUPPORTED_VERSION,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
0);
|
||||
}
|
||||
|
||||
public AssignmentInfo(final int version,
|
||||
final List<TaskId> activeTasks,
|
||||
final Map<TaskId, Set<TopicPartition>> standbyTasks,
|
||||
final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
|
||||
final int errCode) {
|
||||
this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, partitionsByHost, errCode);
|
||||
if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
|
||||
throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
|
||||
+ "; was: " + version);
|
||||
}
|
||||
}
|
||||
|
||||
// for testing only; don't apply version checks
|
||||
AssignmentInfo(final int version,
|
||||
final int latestSupportedVersion,
|
||||
final List<TaskId> activeTasks,
|
||||
final Map<TaskId, Set<TopicPartition>> standbyTasks,
|
||||
final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
|
||||
final int errCode) {
|
||||
public AssignmentInfo(final int version,
|
||||
final int commonlySupportedVersion,
|
||||
final List<TaskId> activeTasks,
|
||||
final Map<TaskId, Set<TopicPartition>> standbyTasks,
|
||||
final Map<HostInfo, Set<TopicPartition>> partitionsByHost,
|
||||
final int errCode) {
|
||||
this.usedVersion = version;
|
||||
this.latestSupportedVersion = latestSupportedVersion;
|
||||
this.commonlySupportedVersion = commonlySupportedVersion;
|
||||
this.activeTasks = activeTasks;
|
||||
this.standbyTasks = standbyTasks;
|
||||
this.partitionsByHost = partitionsByHost;
|
||||
this.errCode = errCode;
|
||||
|
||||
if (version < 1 || version > LATEST_SUPPORTED_VERSION) {
|
||||
throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION
|
||||
+ "; was: " + version);
|
||||
}
|
||||
}
|
||||
|
||||
public int version() {
|
||||
|
@ -109,8 +103,8 @@ public class AssignmentInfo {
|
|||
return errCode;
|
||||
}
|
||||
|
||||
public int latestSupportedVersion() {
|
||||
return latestSupportedVersion;
|
||||
public int commonlySupportedVersion() {
|
||||
return commonlySupportedVersion;
|
||||
}
|
||||
|
||||
public List<TaskId> activeTasks() {
|
||||
|
@ -422,7 +416,7 @@ public class AssignmentInfo {
|
|||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode()
|
||||
return usedVersion ^ commonlySupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode()
|
||||
^ partitionsByHost.hashCode() ^ errCode;
|
||||
}
|
||||
|
||||
|
@ -431,7 +425,7 @@ public class AssignmentInfo {
|
|||
if (o instanceof AssignmentInfo) {
|
||||
final AssignmentInfo other = (AssignmentInfo) o;
|
||||
return usedVersion == other.usedVersion &&
|
||||
latestSupportedVersion == other.latestSupportedVersion &&
|
||||
commonlySupportedVersion == other.commonlySupportedVersion &&
|
||||
errCode == other.errCode &&
|
||||
activeTasks.equals(other.activeTasks) &&
|
||||
standbyTasks.equals(other.standbyTasks) &&
|
||||
|
@ -444,7 +438,7 @@ public class AssignmentInfo {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "[version=" + usedVersion
|
||||
+ ", supported version=" + latestSupportedVersion
|
||||
+ ", supported version=" + commonlySupportedVersion
|
||||
+ ", active tasks=" + activeTasks
|
||||
+ ", standby tasks=" + standbyTasks
|
||||
+ ", partitions by host=" + partitionsByHost + "]";
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.kafka.streams.processor.internals.assignment;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.streams.processor.TaskId;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
@ -29,15 +31,25 @@ public class ClientState {
|
|||
private final Set<TaskId> prevStandbyTasks;
|
||||
private final Set<TaskId> prevAssignedTasks;
|
||||
|
||||
private int capacity;
|
||||
private final Map<String, Set<TaskId>> prevActiveTasksByConsumer;
|
||||
private final Map<String, Set<TaskId>> prevStandbyTasksByConsumer;
|
||||
|
||||
private int capacity;
|
||||
|
||||
public ClientState() {
|
||||
this(0);
|
||||
}
|
||||
|
||||
ClientState(final int capacity) {
|
||||
this(new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), new HashSet<>(), capacity);
|
||||
this(new HashSet<>(),
|
||||
new HashSet<>(),
|
||||
new HashSet<>(),
|
||||
new HashSet<>(),
|
||||
new HashSet<>(),
|
||||
new HashSet<>(),
|
||||
new HashMap<>(),
|
||||
new HashMap<>(),
|
||||
capacity);
|
||||
}
|
||||
|
||||
private ClientState(final Set<TaskId> activeTasks,
|
||||
|
@ -46,6 +58,8 @@ public class ClientState {
|
|||
final Set<TaskId> prevActiveTasks,
|
||||
final Set<TaskId> prevStandbyTasks,
|
||||
final Set<TaskId> prevAssignedTasks,
|
||||
final Map<String, Set<TaskId>> prevActiveTasksByConsumer,
|
||||
final Map<String, Set<TaskId>> prevStandbyTasksByConsumer,
|
||||
final int capacity) {
|
||||
this.activeTasks = activeTasks;
|
||||
this.standbyTasks = standbyTasks;
|
||||
|
@ -53,6 +67,8 @@ public class ClientState {
|
|||
this.prevActiveTasks = prevActiveTasks;
|
||||
this.prevStandbyTasks = prevStandbyTasks;
|
||||
this.prevAssignedTasks = prevAssignedTasks;
|
||||
this.prevActiveTasksByConsumer = prevActiveTasksByConsumer;
|
||||
this.prevStandbyTasksByConsumer = prevStandbyTasksByConsumer;
|
||||
this.capacity = capacity;
|
||||
}
|
||||
|
||||
|
@ -64,6 +80,8 @@ public class ClientState {
|
|||
new HashSet<>(prevActiveTasks),
|
||||
new HashSet<>(prevStandbyTasks),
|
||||
new HashSet<>(prevAssignedTasks),
|
||||
new HashMap<>(prevActiveTasksByConsumer),
|
||||
new HashMap<>(prevStandbyTasksByConsumer),
|
||||
capacity);
|
||||
}
|
||||
|
||||
|
@ -107,14 +125,24 @@ public class ClientState {
|
|||
return activeTasks.size();
|
||||
}
|
||||
|
||||
public void addPreviousActiveTasks(final Set<TaskId> prevTasks) {
|
||||
public void addPreviousActiveTasks(final String consumer, final Set<TaskId> prevTasks) {
|
||||
prevActiveTasks.addAll(prevTasks);
|
||||
prevAssignedTasks.addAll(prevTasks);
|
||||
prevActiveTasksByConsumer.put(consumer, prevTasks);
|
||||
}
|
||||
|
||||
public void addPreviousStandbyTasks(final Set<TaskId> standbyTasks) {
|
||||
public void addPreviousStandbyTasks(final String consumer, final Set<TaskId> standbyTasks) {
|
||||
prevStandbyTasks.addAll(standbyTasks);
|
||||
prevAssignedTasks.addAll(standbyTasks);
|
||||
prevStandbyTasksByConsumer.put(consumer, standbyTasks);
|
||||
}
|
||||
|
||||
public Set<TaskId> prevActiveTasksForConsumer(final String consumer) {
|
||||
return prevActiveTasksByConsumer.get(consumer);
|
||||
}
|
||||
|
||||
public Set<TaskId> prevStandbyTasksForConsumer(final String consumer) {
|
||||
return prevStandbyTasksByConsumer.get(consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1236,7 +1236,7 @@ public class StreamsPartitionAssignorTest {
|
|||
)));
|
||||
assertThat(assignment.get("consumer1").partitions(), equalTo(asList(t1p0, t1p1)));
|
||||
|
||||
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo()));
|
||||
assertThat(AssignmentInfo.decode(assignment.get("future-consumer").userData()), equalTo(new AssignmentInfo(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION)));
|
||||
assertThat(assignment.get("future-consumer").partitions().size(), equalTo(0));
|
||||
}
|
||||
|
||||
|
|
|
@ -319,6 +319,7 @@ public class TaskManagerTest {
|
|||
@Test
|
||||
public void shouldPauseActivePartitions() {
|
||||
mockSingleActiveTask();
|
||||
expect(consumer.assignment()).andReturn(taskId0Partitions).times(2);
|
||||
consumer.pause(taskId0Partitions);
|
||||
expectLastCall();
|
||||
replay();
|
||||
|
|
|
@ -70,7 +70,7 @@ public class ClientStateTest {
|
|||
final TaskId tid1 = new TaskId(0, 1);
|
||||
final TaskId tid2 = new TaskId(0, 2);
|
||||
|
||||
client.addPreviousActiveTasks(Utils.mkSet(tid1, tid2));
|
||||
client.addPreviousActiveTasks("consumer", Utils.mkSet(tid1, tid2));
|
||||
assertThat(client.previousActiveTasks(), equalTo(Utils.mkSet(tid1, tid2)));
|
||||
assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2)));
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public class ClientStateTest {
|
|||
final TaskId tid1 = new TaskId(0, 1);
|
||||
final TaskId tid2 = new TaskId(0, 2);
|
||||
|
||||
client.addPreviousStandbyTasks(Utils.mkSet(tid1, tid2));
|
||||
client.addPreviousStandbyTasks("consumer", Utils.mkSet(tid1, tid2));
|
||||
assertThat(client.previousActiveTasks().size(), equalTo(0));
|
||||
assertThat(client.previousAssignedTasks(), equalTo(Utils.mkSet(tid1, tid2)));
|
||||
}
|
||||
|
|
|
@ -207,11 +207,11 @@ public class StickyTaskAssignorTest {
|
|||
@Test
|
||||
public void shouldAssignTasksToClientWithPreviousStandbyTasks() {
|
||||
final ClientState client1 = createClient(p1, 1);
|
||||
client1.addPreviousStandbyTasks(Utils.mkSet(task02));
|
||||
client1.addPreviousStandbyTasks("consumer", Utils.mkSet(task02));
|
||||
final ClientState client2 = createClient(p2, 1);
|
||||
client2.addPreviousStandbyTasks(Utils.mkSet(task01));
|
||||
client2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
|
||||
final ClientState client3 = createClient(p3, 1);
|
||||
client3.addPreviousStandbyTasks(Utils.mkSet(task00));
|
||||
client3.addPreviousStandbyTasks("consumer", Utils.mkSet(task00));
|
||||
|
||||
final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
|
||||
|
||||
|
@ -225,9 +225,9 @@ public class StickyTaskAssignorTest {
|
|||
@Test
|
||||
public void shouldAssignBasedOnCapacityWhenMultipleClientHaveStandbyTasks() {
|
||||
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00);
|
||||
c1.addPreviousStandbyTasks(Utils.mkSet(task01));
|
||||
c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
|
||||
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 2, task02);
|
||||
c2.addPreviousStandbyTasks(Utils.mkSet(task01));
|
||||
c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01));
|
||||
|
||||
final StickyTaskAssignor taskAssignor = createTaskAssignor(task00, task01, task02);
|
||||
|
||||
|
@ -455,9 +455,9 @@ public class StickyTaskAssignorTest {
|
|||
@Test
|
||||
public void shouldNotHaveSameAssignmentOnAnyTwoHostsWhenThereArePreviousStandbyTasks() {
|
||||
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task02);
|
||||
c1.addPreviousStandbyTasks(Utils.mkSet(task03, task00));
|
||||
c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task00));
|
||||
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task03, task00);
|
||||
c2.addPreviousStandbyTasks(Utils.mkSet(task01, task02));
|
||||
c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task02));
|
||||
|
||||
createClient(p3, 1);
|
||||
createClient(p4, 1);
|
||||
|
@ -577,14 +577,14 @@ public class StickyTaskAssignorTest {
|
|||
final TaskId task23 = new TaskId(2, 3);
|
||||
|
||||
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13);
|
||||
c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
|
||||
c1.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task11, task20, task21, task23));
|
||||
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22);
|
||||
c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23));
|
||||
c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23));
|
||||
final ClientState c3 = createClientWithPreviousActiveTasks(p3, 1, task20, task21, task23);
|
||||
c3.addPreviousStandbyTasks(Utils.mkSet(task02, task12));
|
||||
c3.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task12));
|
||||
|
||||
final ClientState newClient = createClient(p4, 1);
|
||||
newClient.addPreviousStandbyTasks(Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23));
|
||||
newClient.addPreviousStandbyTasks("consumer", Utils.mkSet(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23));
|
||||
|
||||
final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
|
||||
taskAssignor.assign(0);
|
||||
|
@ -607,15 +607,15 @@ public class StickyTaskAssignorTest {
|
|||
final TaskId task23 = new TaskId(2, 3);
|
||||
|
||||
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task01, task12, task13);
|
||||
c1.addPreviousStandbyTasks(Utils.mkSet(task00, task11, task20, task21, task23));
|
||||
c1.addPreviousStandbyTasks("c1onsumer", Utils.mkSet(task00, task11, task20, task21, task23));
|
||||
final ClientState c2 = createClientWithPreviousActiveTasks(p2, 1, task00, task11, task22);
|
||||
c2.addPreviousStandbyTasks(Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23));
|
||||
c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task01, task10, task02, task20, task03, task12, task21, task13, task23));
|
||||
|
||||
final ClientState bounce1 = createClient(p3, 1);
|
||||
bounce1.addPreviousStandbyTasks(Utils.mkSet(task20, task21, task23));
|
||||
bounce1.addPreviousStandbyTasks("consumer", Utils.mkSet(task20, task21, task23));
|
||||
|
||||
final ClientState bounce2 = createClient(p4, 1);
|
||||
bounce2.addPreviousStandbyTasks(Utils.mkSet(task02, task03, task10));
|
||||
bounce2.addPreviousStandbyTasks("consumer", Utils.mkSet(task02, task03, task10));
|
||||
|
||||
final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task10, task01, task02, task11, task20, task03, task12, task21, task13, task22, task23);
|
||||
taskAssignor.assign(0);
|
||||
|
@ -658,7 +658,7 @@ public class StickyTaskAssignorTest {
|
|||
final TaskId task06 = new TaskId(0, 6);
|
||||
final ClientState c1 = createClientWithPreviousActiveTasks(p1, 1, task00, task01, task02, task06);
|
||||
final ClientState c2 = createClient(p2, 1);
|
||||
c2.addPreviousStandbyTasks(Utils.mkSet(task03, task04, task05));
|
||||
c2.addPreviousStandbyTasks("consumer", Utils.mkSet(task03, task04, task05));
|
||||
final ClientState newClient = createClient(p3, 1);
|
||||
|
||||
final StickyTaskAssignor<Integer> taskAssignor = createTaskAssignor(task00, task01, task02, task03, task04, task05, task06);
|
||||
|
@ -705,7 +705,7 @@ public class StickyTaskAssignorTest {
|
|||
|
||||
private ClientState createClientWithPreviousActiveTasks(final Integer processId, final int capacity, final TaskId... taskIds) {
|
||||
final ClientState clientState = new ClientState(capacity);
|
||||
clientState.addPreviousActiveTasks(Utils.mkSet(taskIds));
|
||||
clientState.addPreviousActiveTasks("consumer", Utils.mkSet(taskIds));
|
||||
clients.put(processId, clientState);
|
||||
return clientState;
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
|
|||
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
|
||||
import org.apache.kafka.streams.processor.internals.TaskManager;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
|
||||
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
|
||||
|
@ -167,6 +168,11 @@ public class StreamsUpgradeTest {
|
|||
final AssignmentInfo info = AssignmentInfo.decode(
|
||||
assignment.userData().putInt(0, LATEST_SUPPORTED_VERSION));
|
||||
|
||||
if (super.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
|
||||
setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
|
||||
return;
|
||||
}
|
||||
|
||||
final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
|
||||
partitions.sort(PARTITION_COMPARATOR);
|
||||
|
||||
|
@ -305,6 +311,7 @@ public class StreamsUpgradeTest {
|
|||
private FutureAssignmentInfo(final boolean bumpUsedVersion,
|
||||
final boolean bumpSupportedVersion,
|
||||
final ByteBuffer bytes) {
|
||||
super(LATEST_SUPPORTED_VERSION, LATEST_SUPPORTED_VERSION);
|
||||
this.bumpUsedVersion = bumpUsedVersion;
|
||||
this.bumpSupportedVersion = bumpSupportedVersion;
|
||||
originalUserMetadata = bytes;
|
||||
|
|
|
@ -355,8 +355,6 @@ class StreamsUpgradeTest(Test):
|
|||
for p in self.processors:
|
||||
found = list(p.node.account.ssh_capture("grep \"Finished assignment for group\" %s" % p.LOG_FILE, allow_fail=True))
|
||||
if len(found) >= self.leader_counter[p] + 1:
|
||||
if self.leader is not None:
|
||||
raise Exception("Could not uniquely identify leader")
|
||||
self.leader = p
|
||||
self.leader_counter[p] = self.leader_counter[p] + 1
|
||||
|
||||
|
@ -557,38 +555,25 @@ class StreamsUpgradeTest(Test):
|
|||
else:
|
||||
self.leader_counter[self.leader] = self.leader_counter[self.leader] + 1
|
||||
|
||||
if processor == self.leader:
|
||||
leader_monitor = log_monitor
|
||||
elif first_other_processor == self.leader:
|
||||
leader_monitor = first_other_monitor
|
||||
elif second_other_processor == self.leader:
|
||||
leader_monitor = second_other_monitor
|
||||
else:
|
||||
raise Exception("Could not identify leader.")
|
||||
|
||||
monitors = {}
|
||||
monitors[processor] = log_monitor
|
||||
monitors[first_other_processor] = first_other_monitor
|
||||
monitors[second_other_processor] = second_other_monitor
|
||||
|
||||
leader_monitor.wait_until("Received a future (version probing) subscription (version: 6). Sending empty assignment back (with supported version 5).",
|
||||
timeout_sec=60,
|
||||
err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account))
|
||||
|
||||
if len(self.old_processors) > 0:
|
||||
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.",
|
||||
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
|
||||
timeout_sec=60,
|
||||
err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account))
|
||||
else:
|
||||
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Setting subscription metadata to leaders supported version 6 and trigger new rebalance.",
|
||||
log_monitor.wait_until("Sent a version 6 subscription and got version 5 assignment back (successful version probing). Downgrade subscription metadata to commonly supported version and trigger new rebalance.",
|
||||
timeout_sec=60,
|
||||
err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account))
|
||||
first_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.",
|
||||
first_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.",
|
||||
timeout_sec=60,
|
||||
err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(first_other_node.account))
|
||||
second_other_monitor.wait_until("Sent a version 5 subscription and group leader.s latest supported version is 6. Upgrading subscription metadata version to 6 for next rebalance.",
|
||||
err_msg="Never saw output 'Upgrade metadata to version 6' on" + str(first_other_node.account))
|
||||
second_other_monitor.wait_until("Sent a version 5 subscription and group.s latest commonly supported version is 6 (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to 6 for next rebalance.",
|
||||
timeout_sec=60,
|
||||
err_msg="Never saw output 'Upgrade metadata to version 5' on" + str(second_other_node.account))
|
||||
err_msg="Never saw output 'Upgrade metadata to version 6' on" + str(second_other_node.account))
|
||||
|
||||
log_monitor.wait_until("Version probing detected. Triggering new rebalance.",
|
||||
timeout_sec=60,
|
||||
|
|
Загрузка…
Ссылка в новой задаче