KAFKA-6205: initialize topology after state stores restoration completed

Initialize topology after state store restoration.
Although IMHO updating some of the existing tests demonstrates the correct order of operations, I'll probably add an integration test, but I wanted to get this PR in for feedback on the approach.

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>

Closes #4415 from bbejeck/KAFKA-6205_restore_state_stores_before_initializing_topology

minor log4j edits
This commit is contained in:
Bill Bejeck 2018-01-26 09:24:40 -08:00 коммит произвёл Guozhang Wang
Родитель d673c8cf94
Коммит c615c597aa
12 изменённых файлов: 101 добавлений и 51 удалений

Просмотреть файл

@ -196,10 +196,11 @@ public abstract class AbstractTask implements Task {
}
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*
* Package-private for testing only
*/
void initializeStateStores() {
void registerStateStores() {
if (topology.stateStores().isEmpty()) {
return;
}

Просмотреть файл

@ -99,7 +99,7 @@ abstract class AssignedTasks<T extends Task> {
for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, T> entry = it.next();
try {
if (!entry.getValue().initialize()) {
if (!entry.getValue().initializeStateStores()) {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
addToRestoring(entry.getValue());
} else {
@ -272,6 +272,7 @@ abstract class AssignedTasks<T extends Task> {
private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
for (TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task);
if (task.hasStateStores()) {

Просмотреть файл

@ -62,14 +62,20 @@ public class StandbyTask extends AbstractTask {
}
@Override
public boolean initialize() {
initializeStateStores();
public boolean initializeStateStores() {
log.trace("Initializing state stores");
registerStateStores();
checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
processorContext.initialized();
taskInitialized = true;
return true;
}
@Override
public void initializeTopology() {
//no-op
}
/**
* <pre>
* - update offset limits

Просмотреть файл

@ -161,15 +161,18 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
@Override
public boolean initialize() {
log.trace("Initializing");
initializeStateStores();
initTopology();
processorContext.initialized();
taskInitialized = true;
public boolean initializeStateStores() {
log.trace("Initializing state stores");
registerStateStores();
return changelogPartitions().isEmpty();
}
@Override
public void initializeTopology() {
initTopology();
processorContext.initialized();
taskInitialized = true;
}
/**
* <pre>

Просмотреть файл

@ -32,7 +32,9 @@ public interface Task {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
boolean initialize();
boolean initializeStateStores();
void initializeTopology();
void commit();

Просмотреть файл

@ -327,6 +327,7 @@ class TaskManager {
standby.initializeNewTasks();
final Collection<TopicPartition> restored = changelogReader.restore(active);
resumed.addAll(active.updateRestored(restored));
if (!resumed.isEmpty()) {

Просмотреть файл

@ -101,7 +101,7 @@ public class AbstractTaskTest {
final AbstractTask task = createTask(consumer, Collections.singletonMap(store, "dummy"));
try {
task.initializeStateStores();
task.registerStateStores();
fail("Should have thrown LockException");
} catch (final LockException e) {
// ok
@ -116,7 +116,7 @@ public class AbstractTaskTest {
final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap());
task.initializeStateStores();
task.registerStateStores();
// should fail if lock is called
EasyMock.verify(stateDirectory);
@ -254,9 +254,12 @@ public class AbstractTaskTest {
public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {}
@Override
public boolean initialize() {
public boolean initializeStateStores() {
return false;
}
@Override
public void initializeTopology() {}
};
}

Просмотреть файл

@ -89,7 +89,7 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldInitializeNewTasks() {
EasyMock.expect(t1.initialize()).andReturn(false);
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
EasyMock.replay(t1);
@ -101,10 +101,14 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
EasyMock.expect(t1.initialize()).andReturn(false);
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
EasyMock.expect(t2.initialize()).andReturn(true);
EasyMock.expect(t2.initializeStateStores()).andReturn(true);
t2.initializeTopology();
EasyMock.expectLastCall().once();
final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
EasyMock.expect(t2.partitions()).andReturn(t2partitions);
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@ -125,7 +129,9 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
EasyMock.expect(t2.initialize()).andReturn(true);
EasyMock.expect(t2.initializeStateStores()).andReturn(true);
t2.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t2.hasStateStores()).andReturn(false);
@ -142,10 +148,12 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldTransitionFullyRestoredTasksToRunning() {
final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
EasyMock.expect(t1.initialize()).andReturn(false);
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.replay(t1);
addAndInitTask();
@ -169,7 +177,7 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseRestoringTasks() {
EasyMock.expect(t1.initialize()).andReturn(false);
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
t1.close(false, false);
@ -236,6 +244,8 @@ public class AssignedStreamsTasksTest {
mockRunningTaskSuspension();
t1.resume();
EasyMock.expectLastCall();
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.replay(t1);
assertThat(suspendTask(), nullValue());
@ -266,7 +276,9 @@ public class AssignedStreamsTasksTest {
}
private void mockTaskInitialization() {
EasyMock.expect(t1.initialize()).andReturn(true);
EasyMock.expect(t1.initializeStateStores()).andReturn(true);
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t1.hasStateStores()).andReturn(false);
@ -449,7 +461,9 @@ public class AssignedStreamsTasksTest {
}
private void mockRunningTaskSuspension() {
EasyMock.expect(t1.initialize()).andReturn(true);
EasyMock.expect(t1.initializeStateStores()).andReturn(true);
t1.initializeTopology();
EasyMock.expectLastCall().once();
EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();

Просмотреть файл

@ -155,7 +155,7 @@ public class StandbyTaskTest {
public void testStorePartitions() throws IOException {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
task.initialize();
task.initializeStateStores();
assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
}
@ -177,7 +177,7 @@ public class StandbyTaskTest {
public void testUpdate() throws IOException {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory);
task.initialize();
task.initializeStateStores();
final Set<TopicPartition> partition = Collections.singleton(partition2);
restoreStateConsumer.assign(partition);
@ -223,7 +223,7 @@ public class StandbyTaskTest {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory);
task.initialize();
task.initializeStateStores();
restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@ -344,7 +344,7 @@ public class StandbyTaskTest {
null,
stateDirectory
);
task.initialize();
task.initializeStateStores();
restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
@ -396,7 +396,7 @@ public class StandbyTaskTest {
closedStateManager.set(true);
}
};
task.initialize();
task.initializeStateStores();
try {
task.close(true, false);
fail("should have thrown exception");

Просмотреть файл

@ -318,7 +318,8 @@ public class StreamTaskTest {
@Test
public void testMaybePunctuateStreamTime() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@ -384,7 +385,8 @@ public class StreamTaskTest {
@Test
public void testCancelPunctuateStreamTime() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@ -416,7 +418,8 @@ public class StreamTaskTest {
@Test
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
@ -430,7 +433,8 @@ public class StreamTaskTest {
@Test
public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
long now = time.milliseconds();
assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate
time.sleep(9);
@ -441,7 +445,8 @@ public class StreamTaskTest {
@Test
public void testCancelPunctuateSystemTime() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
@ -454,7 +459,8 @@ public class StreamTaskTest {
@Test
public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
task = createTaskThatThrowsException();
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.addRecords(partition2, Collections.singletonList(
new ConsumerRecord<>(partition2.topic(), partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@ -469,7 +475,8 @@ public class StreamTaskTest {
@Test
public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
try {
task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, new Punctuator() {
@ -489,7 +496,8 @@ public class StreamTaskTest {
@Test
public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
try {
task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@ -531,7 +539,8 @@ public class StreamTaskTest {
@Test
public void shouldCheckpointOffsetsOnCommit() throws IOException {
task = createStatefulTask(false, true);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.commit();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME));
@ -542,7 +551,8 @@ public class StreamTaskTest {
@Test
public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
task = createStatefulTask(true, true);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.commit();
final File checkpointFile = new File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME);
@ -553,7 +563,8 @@ public class StreamTaskTest {
@Test
public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.processorContext.setCurrentNode(processorStreamTime);
try {
task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@ -566,7 +577,8 @@ public class StreamTaskTest {
@Test
public void shouldCallPunctuateOnPassedInProcessorNode() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
assertThat(punctuatedAt, equalTo(5L));
task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator);
@ -576,7 +588,8 @@ public class StreamTaskTest {
@Test
public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
task = createStatelessTask(false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator);
assertThat(((ProcessorContextImpl) task.context()).currentNode(), nullValue());
}
@ -607,7 +620,8 @@ public class StreamTaskTest {
@Test
public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
task = createTaskThatThrowsException();
task.initialize();
task.initializeStateStores();
task.initializeTopology();
try {
task.close(true, false);
fail("should have thrown runtime exception");
@ -760,7 +774,8 @@ public class StreamTaskTest {
@Test
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
task = createTaskThatThrowsException();
task.initialize();
task.initializeStateStores();
task.initializeTopology();
try {
task.commit();
@ -774,7 +789,8 @@ public class StreamTaskTest {
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
final StreamTask task = createTaskThatThrowsException();
task.initialize();
task.initializeStateStores();
task.initializeTopology();
try {
task.suspend();
fail("should have thrown an exception");
@ -786,7 +802,8 @@ public class StreamTaskTest {
@Test
public void shouldCloseStateManagerIfFailureOnTaskClose() {
task = createStatefulTaskThatThrowsExceptionOnClose(true, false);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
try {
task.close(true, false);
@ -813,14 +830,14 @@ public class StreamTaskTest {
public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
final StreamTask task = createStatefulTask(false, false);
assertTrue(task.initialize());
assertTrue(task.initializeStateStores());
}
@Test
public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
final StreamTask task = createStatefulTask(false, true);
assertFalse(task.initialize());
assertFalse(task.initializeStateStores());
}
@Test
@ -840,7 +857,8 @@ public class StreamTaskTest {
task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition), topology, consumer, changelogReader, config,
streamsMetrics, stateDirectory, null, time, producer);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));

Просмотреть файл

@ -103,12 +103,12 @@ public class StreamThreadStateStoreProviderTest {
stateDirectory = new StateDirectory(streamsConfig, new MockTime());
taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
new TaskId(0, 0));
taskOne.initialize();
taskOne.initializeStateStores();
tasks.put(new TaskId(0, 0),
taskOne);
taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
new TaskId(0, 1));
taskTwo.initialize();
taskTwo.initializeStateStores();
tasks.put(new TaskId(0, 1),
taskTwo);

Просмотреть файл

@ -248,7 +248,8 @@ public class ProcessorTopologyTestDriver {
cache,
new MockTime(),
producer);
task.initialize();
task.initializeStateStores();
task.initializeTopology();
}
}