-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… #19275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… #19275
Conversation
…Thread#runOnceWithoutProcessingThreads flow. Removed code of the method StreamThread#initializeAndRestorePhase, since it was only used in the negation flow. Also removed the flag entirely from the StreamThread#runOnceWithoutProcessingThreads method.
Hi @cadonna , Would you be able to verify if the changes I have done are correct, like is this what is required? Also TaskManager#needsInitializationOrRestoration had only one usage through StreamThread#initializeAndRestorePhase method (which was removed). And TaskManager#tryToCompleteRestoration also had only one usage but lot of test cases around 53 I guess. So wanted to know if I should go levels below and remove the code or just top level code. I will do the changes to remove the flag from the other places and commit them, in the meantime would be great if you can verify if my approach is correct. Also could you also start the workflow as well? |
…Thread#create flow. Removed the stateUpdaterEnabled dependency completely from the StreamThread#create methods and all the inner methods that use it
…Thread#maybeGetClientInstanceIds, StreamThread#pollPhase and StreamThread#clientInstanceIds flow. Removed the stateUpdaterEnabled dependency completely from the StreamThread#maybeGetClientInstanceIds, StreamThread#pollPhase and StreamThread#clientInstanceIds flow. Also with this commit, have removed the flag dependency completely from the StreamThread and StreamThreadTest classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @janchilling !
I started the review, but was not able to finish it. Nevertheless, I would like to share the comments up until now with you. Most comments are about additional indentation. We use 4 spaces and not 8. There are also some other comments about the code.
I will proceed with the review next week. Feel free to update the PR in the meanwhile.
final Runnable shutdownErrorHook, | ||
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) { | ||
|
||
final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also remove the internal config _state.updater.enabled_
and all corresponding code from StreamsConfig
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will attend to this within this week
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is still a system test that uses the config. It is streams_upgrade_test.test_upgrade_downgrade_state_updater()
. There is a comment that says:
Once same-thread state restoration is removed from the code, this test
should use different versions of the code.
I guess it means to only use a version before 3.8
(e.g. LATEST_3_7
) for the from_version
and DEV_VERSION
for the to_version
. You need to choose a version before 3.8
because before 3.8
the state updater was not enabled by default.
@lucasbru did I correctly interpret your comment?
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
Hi @cadonna , Thank you for the review! Really sorry that you had to go through the indentation thing, I must have selected the whole file by accident and clicked ctrl+alt+l! Anyway the indentation thing is now resolved in all places and some other reviews has also been resolved now. I'll attend to the rest(Removing the flag from the rest of the codebase) within this week. |
…er to createAndStartStateUpdater since it now always creates and starts the state updater
…pdated a redundant part of a condition check and an indentation update
…er and StateChangeLogReaderTest classes. Removed the boolean for useNonBlockingPoll since it will always be false(!stateUpdaterEnabled is always false). Therefore, the poll method will always use the polltime. Test cases in the StateChangelogReaderTest has also been updated to match the above scenario. In the Test class the methods shouldPollWithRightTimeoutWithoutStateUpdater and shouldPollWithRightTimeout(final boolean stateUpdaterEnabled, final Task.TaskType type) is redundant since the current default behavior of stateUpdater always true will be checked by shouldPollWithRightTimeoutWithStateUpdaterDefault method.
… the StateChangeLogReader and StateChangeLogReaderTest classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@janchilling Thanks for the updates!
Here my comments.
|
||
if (!stateUpdaterEnabled && changelogReader != null) { | ||
if (changelogReader != null) { | ||
changelogReader.unregister(getAllChangelogTopicPartitions()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since stateUpdaterEnabled
is basically always true
if we remove the flag, the if
-condition should always be false
and the code guarded by the if
-condition should never be executed.
IMO, the changelog reader can be removed from the ProcessorStateManager
since registering changelog topics is done in the state updater. Only the old code path that did not use the state updater needed the changelog reader here.
If the ProcessorStateManager
does not need the changelog reader, the active task creator and the standby task creator do also not need the changelog reader.
|
||
if (!stateUpdaterEnabled && changelogReader != null) { | ||
if (changelogReader != null) { | ||
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions(); | ||
changelogReader.unregister(allChangelogs); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my above comment.
final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx; | ||
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); | ||
final String restorationThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId; | ||
final String restorationThreadId = stateUpdaterId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could directly use stateUpdaterId
instead of restorationThreadId
since the distinction between state updater ID and thread ID for restoration does not hold anymore.
|
||
if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) { | ||
if (state == State.PARTITIONS_ASSIGNED) { | ||
// try to fetch some records with zero poll millis | ||
// to unblock the restoration as soon as possible | ||
records = pollRequests(Duration.ZERO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can get rid of this branch. Polling with duration zero during PARTITIONS_ASSIGNED
only applies to the old code path. With the state updater polling should use the configured poll time as stated on line 1153.
|
||
@ParameterizedTest | ||
@EnumSource(value = Task.TaskType.class, names = {"ACTIVE", "STANDBY"}) | ||
public void shouldPollWithRightTimeoutWithStateUpdaterDefault(final Task.TaskType type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please include shouldPollWithRightTimeout()
into this test and rename this test to shouldPollWithRightTimeout()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cadonna Not clear. You mean just change the name of shouldPollWithRightTimeoutWithStateUpdaterDefault()
to shouldPollWithRightTimeout()
? or bring back shouldPollWithRightTimeout()
as it was earlier?
The reason I removed shouldPollWithRightTimeout()
method was since shouldPollWithRightTimeoutWithStateUpdaterDefault()
also has the same behavior and they were checking the same thing basically. So redundant code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to copy the content of shouldPollWithRightTimeout(final Properties properties, final Task.TaskType type)
(below this method) into this method since shouldPollWithRightTimeout()
is only called in this method. Then rename this method from shouldPollWithRightTimeoutWithStateUpdaterDefault()
to shouldPollWithRightTimeout()
.
() -> thread.taskManager().checkStateUpdater( | ||
mockTime.milliseconds(), | ||
topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) | ||
), | ||
10 * 1000, | ||
"State updater never returned tasks."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the indentation here.
() -> thread.taskManager().checkStateUpdater( | ||
mockTime.milliseconds(), | ||
topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) | ||
), | ||
10 * 1000, | ||
"State updater never returned tasks."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my comment above about indentation.
() -> mockRestoreConsumer.assignment().size() == 0, | ||
"Never get the assignment"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix indentation and could you please also fix the typo? It should be Never got the assignment
.
Changes - Indentation fixes in the StoreChangelogReaderTest class - Removed restorationThreadId from the StreamThread, instead of it stateUpdaterId used since distinction between state updater ID and thread ID for restoration does not hold anymore. - Removed the `if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled)` from the pollPhase() method in the StreamThread since with state updater polling should use the configured poll time. - Merged the shouldPollWithRightTimeout test case with shouldPollWithRightTimeoutWithStateUpdaterDefault to shouldPollWithRightTimeout.
FYI: I will not be online for the next 1.5 weeks. |
…KA-18913/Old-code-removal-gaurded-by-stateUpdaterEnabled-flag # Conflicts: # streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java # streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java # streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
…KA-18913/Old-code-removal-gaurded-by-stateUpdaterEnabled-flag # Conflicts: # streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@janchilling Is the PR ready for re-review? |
Hi @cadonna Not yet, I am having some university exams and assignments to complete, so could not focus on this for the past few weeks. But I'll make sure this is ready by the end of this week. Have a few questions also, some test cases in the StreamThreadTest is failing due to this line, https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java/#L252 . |
No worries! |
…t, StreamThreadTest, TaskManagerTest classes and TaskManager class. ProcessorStateManagerTest class changes : - Removed the use of ChangelogReader from the class, since it is not used in the StateUpdater enabled default flow - Removed ProcessorStateManagerTest#shouldNotRegisterNonLoggedStore and ProcessorStateManagerTest#shouldUnregisterChangelogsDuringClose, since ChangelogReader was removed. - Changed the ProcessorStateManagerTest#shouldRegisterNonPersistentStore and ProcessorStateManagerTest#shouldRegisterPersistentStores test cases to verify the registerStore method. TaskManagerTest class changes : - Removed test cases that verified the old code flow which used the TaskManager#handleAssignment with TaskManager#tryToCompleteRestoration since it follows the old code flow. TaskManager class changes : - Removed TaskManager#tryToCompleteRestoration method since it is a part of the old code flow without the StateUpdater. StreamThreadTest class changes : - Changes @MethodSource("data") to @valuesource(booleans = {true, false}) since only now true and false only needs to be passed to the test methods since state updater is default true. - Removed tests shouldOnlyCompleteShutdownAfterRebalanceNotInProgress, shouldUpdateStandbyTask, shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater and shouldNotUpdateStandbyTaskWhenPaused test cases since they needed the StateUpdater to be false. Note - testNamedTopologyWithStreamsProtocol and testStreamsRebalanceDataWithExtraCopartition test will fail since those verify an IllegalStateException when creating a Stream Thread. But during the StreamThread.create() method we create a StateUpdater thread regardless of whether a StreamThread is created or not. Therefore, during the teardown process, a StateUpdater thread is left hanging(This is a bug). Will correct this using a separate PR.
…KA-18913/Old-code-removal-gaurded-by-stateUpdaterEnabled-flag # Conflicts: # streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java # streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
Hi @cadonna , I think I have completed all, sorry for the delay though. Only one problem remains. Which is the I will create a separate PR to the above issue with the solution, I guess then we'll have to merge that PR before merging this. |
Hi @janchilling , Sorry for the long silence but I was quite busy recently. Yeah, I agree with you about the bug regarding the state updater thread that is not torn down after the Could you open a separate PR just for that? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @janchilling!
I really appreciate your patience and endurance!
Here my feedback!
Most of the tests in task manager test that you deleted need a rewrite instead of just deleting them.
.toList(); | ||
} | ||
|
||
private static Stream<Arguments> singleAndMultiTaskParameters() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name does not really fit anymore. I propose to rename this method to topologyComplexityAndRebalanceProtocol
.
streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class); | ||
streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, TestConsumerWrapper.class); | ||
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | ||
streamsConfiguration.putAll(extraProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem right. On line 472 the group protocol config is passed to props()
, but here it is ignored.
final Runnable shutdownErrorHook, | ||
final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) { | ||
|
||
final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is still a system test that uses the config. It is streams_upgrade_test.test_upgrade_downgrade_state_updater()
. There is a comment that says:
Once same-thread state restoration is removed from the code, this test
should use different versions of the code.
I guess it means to only use a version before 3.8
(e.g. LATEST_3_7
) for the from_version
and DEV_VERSION
for the to_version
. You need to choose a version before 3.8
because before 3.8
the state updater was not enabled by default.
@lucasbru did I correctly interpret your comment?
failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater()); | ||
} | ||
|
||
handleTasksWithStateUpdater( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please rename this method to handleTasks()
. We do not need to distinguish the cases with and without state updater.
tasks.removeTask(oldTask); | ||
tasks.addPendingTasksToInit(Collections.singleton(standbyTask)); | ||
} else { | ||
tasks.replaceActiveWithStandby(standbyTask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method from the TaskRegistry
interface is not used anymore. Could you please remove it and its implementations?
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { | ||
// The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume() | ||
// is not called. This is not true when the state updater is enabled which leads to | ||
// java.lang.IllegalStateException: No current assignment for partition topic1-2. | ||
// Since this tests verifies an aspect that is independent from the state updater, it is OK to disable | ||
// the state updater and leave the rewriting of the test to later, when the code path for disabled state updater | ||
// is removed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe, you need to rewrite this test as the comment says. Let me know, if you need some help with that.
@@ -1851,29 +1736,7 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() | |||
} | |||
|
|||
@Test | |||
public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add the following test as a replacement for the this test:
@Test
public void shouldComputeOffsetSumForRunningStatefulTask() {
final StreamTask runningStatefulTask = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
when(runningStatefulTask.changelogOffsets())
.thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask)));
assertThat(
taskManager.taskOffsetSums(),
is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
);
}
@@ -1909,7 +1772,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro | |||
} | |||
|
|||
@Test | |||
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() { | |||
public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please change
final long changelogOffsetOfRunningTask = 42L;
to
final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
to make the case more real?
@@ -1940,57 +1803,6 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat | |||
); | |||
} | |||
|
|||
@Test | |||
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please replace this test with the following:
@Test
public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final long changelogOffsetOfRestoringStandbyTask = 84L;
when(restoringStatefulTask.changelogOffsets())
.thenReturn(mkMap(
mkEntry(t1p1changelog, changelogOffsetOfRestoringStandbyTask),
mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
assertThat(
taskManager.taskOffsetSums(),
is(mkMap(
mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
))
);
}
where
private final TopicPartition t1p1changelog2 = new TopicPartition("changelog2", 1);
@@ -2097,105 +1909,9 @@ public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception | |||
assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); | |||
} | |||
|
|||
@Test | |||
public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you cannot just delete all the tests that contain tryToCompleteRetoration()
. You need to rewrite them. Let me know if you need help.
Hey @janchilling are you still working on this? Just checking. |
@lucasbru Yes, they are. They were sidetracked by the following PR: #19889 (comment) |
…-removal-gaurded-by-stateUpdaterEnabled-flag # Conflicts: # streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java # streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
…n StreamThreadTest fix Updated the StreamThread#shouldOnlyCompleteShutdownAfterRebalanceNotInProgress test case and some other changes that were requested.
Hi @cadonna , I rewrote the Also, can you help me identify the test cases that needs a rewrite from the |
… and updated to test cases in the ProcessorStateManagerTest class StreamThreadTest changes : Removed two unused methods addStandbyRecordsToRestoreConsumer() and addActiveRecordsToRestoreConsumer() TaskManagerTest changes : Added methods shouldComputeOffsetSumForRunningStatefulTask() as a replacement for shouldReportLatestOffsetAsOffsetSumForRunningTask() and shouldSkipUnknownOffsetsWhenComputingOffsetSum() as a replacement for shouldSkipUnknownOffsetsWhenComputingOffsetSum(). TasksRegistry and Tasks changes : Removed unused method replaceActiveWithStandby() ProcessorStateManagerTest changes : Updated the method shouldUnregisterChangelogsDuringClose(), shouldRecycleStoreAndReregisterChangelog() to shouldCloseStateStoresOnStateManagerClose(), shouldRecycleAndReinitializeStore() respectively since the changelogReader was removed by the ProcessorStateManager.
Hi @cadonna , I kinda need help with updating the Test cases in the TaskManagerTest class. |
@janchilling Thank you for your patience! |
@janchilling -- Had a very brief look into this PR, and it is very large. I would propose to split it up, into multiple smaller PRs to allow us to make incremental progress. |
Closing this PR due to inactivity. It did become stale, and somebody else picked up this ticket now. |
…Thread#runOnceWithoutProcessingThreads flow.
Removed code of the method StreamThread#initializeAndRestorePhase, since
it was only used in the negation flow. Also removed the flag entirely
from the StreamThread#runOnceWithoutProcessingThreads method.
Will remove the flag and the related code entirely from the future
commits!