@@ -152,6 +152,7 @@ public class TaskManagerTest {
152
152
private final TopicPartition t1p1 = new TopicPartition (topic1 , 1 );
153
153
private final TopicPartition t2p2 = new TopicPartition (topic2 , 1 );
154
154
private final TopicPartition t1p1changelog = new TopicPartition ("changelog" , 1 );
155
+ private final TopicPartition t1p1changelog2 = new TopicPartition ("changelog2" , 1 );
155
156
private final Set <TopicPartition > taskId01Partitions = Set .of (t1p1 );
156
157
private final Set <TopicPartition > taskId01ChangelogPartitions = Set .of (t1p1changelog );
157
158
private final Map <TaskId , Set <TopicPartition >> taskId01Assignment = singletonMap (taskId01 , taskId01Partitions );
@@ -218,6 +219,10 @@ public void setUp() {
218
219
taskManager = setUpTaskManager (StreamsConfigUtils .ProcessingMode .AT_LEAST_ONCE , null , false );
219
220
}
220
221
222
+ private TaskManager setUpTaskManager (final ProcessingMode processingMode , final TasksRegistry tasks ) {
223
+ return setUpTaskManager (processingMode , tasks , false );
224
+ }
225
+
221
226
private TaskManager setUpTaskManager (final ProcessingMode processingMode , final boolean stateUpdaterEnabled ) {
222
227
return setUpTaskManager (processingMode , null , stateUpdaterEnabled , false );
223
228
}
@@ -249,52 +254,6 @@ private TaskManager setUpTaskManager(final ProcessingMode processingMode,
249
254
return taskManager ;
250
255
}
251
256
252
- @ Test
253
- public void shouldClassifyExistingTasksWithoutStateUpdater () {
254
- final TaskManager taskManager = setUpTaskManager (ProcessingMode .AT_LEAST_ONCE , false );
255
- final Map <TaskId , Set <TopicPartition >> runningActiveTasks = mkMap (mkEntry (taskId01 , Set .of (t1p1 )));
256
- final Map <TaskId , Set <TopicPartition >> standbyTasks = mkMap (mkEntry (taskId02 , Set .of (t2p2 )));
257
- final Map <TaskId , Set <TopicPartition >> restoringActiveTasks = mkMap (mkEntry (taskId03 , Set .of (t1p3 )));
258
- final Map <TaskId , Set <TopicPartition >> activeTasks = new HashMap <>(runningActiveTasks );
259
- activeTasks .putAll (restoringActiveTasks );
260
- handleAssignment (runningActiveTasks , standbyTasks , restoringActiveTasks );
261
-
262
- taskManager .handleAssignment (activeTasks , standbyTasks );
263
-
264
- verifyNoInteractions (stateUpdater );
265
- }
266
-
267
- @ Test
268
- public void shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater () {
269
- final StandbyTask standbyTask = standbyTask (taskId03 , taskId03ChangelogPartitions )
270
- .inState (State .RUNNING )
271
- .withInputPartitions (taskId03Partitions ).build ();
272
- updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater (standbyTask , taskId03Partitions );
273
- verify (standbyTask , never ()).updateInputPartitions (eq (taskId03Partitions ), any ());
274
- }
275
-
276
- @ Test
277
- public void shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater () {
278
- final StandbyTask standbyTask = standbyTask (taskId03 , taskId03ChangelogPartitions )
279
- .inState (State .RUNNING )
280
- .withInputPartitions (taskId03Partitions ).build ();
281
- updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater (standbyTask , taskId04Partitions );
282
- verify (standbyTask ).updateInputPartitions (eq (taskId04Partitions ), any ());
283
- }
284
-
285
- private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater (final Task standbyTask ,
286
- final Set <TopicPartition > newInputPartition ) {
287
- final TasksRegistry tasks = mock (TasksRegistry .class );
288
- when (tasks .allTasks ()).thenReturn (Set .of (standbyTask ));
289
- final TaskManager taskManager = setUpTaskManager (ProcessingMode .AT_LEAST_ONCE , tasks , false );
290
-
291
- taskManager .handleAssignment (
292
- Collections .emptyMap (),
293
- mkMap (mkEntry (standbyTask .id (), newInputPartition ))
294
- );
295
-
296
- verify (standbyTask ).resume ();
297
- }
298
257
299
258
@ Test
300
259
public void shouldLockAllTasksOnCorruptionWithProcessingThreads () {
@@ -1853,14 +1812,20 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater()
1853
1812
}
1854
1813
1855
1814
@ Test
1856
- public void shouldReportLatestOffsetAsOffsetSumForRunningTask () throws Exception {
1857
- final Map <TopicPartition , Long > changelogOffsets = mkMap (
1858
- mkEntry (new TopicPartition ("changelog" , 0 ), Task .LATEST_OFFSET ),
1859
- mkEntry (new TopicPartition ("changelog" , 1 ), Task .LATEST_OFFSET )
1860
- );
1861
- final Map <TaskId , Long > expectedOffsetSums = mkMap (mkEntry (taskId00 , Task .LATEST_OFFSET ));
1815
+ public void shouldComputeOffsetSumForRunningStatefulTask () {
1816
+ final StreamTask runningStatefulTask = statefulTask (taskId00 , taskId00ChangelogPartitions )
1817
+ .inState (State .RUNNING ).build ();
1818
+ final long changelogOffsetOfRunningTask = Task .LATEST_OFFSET ;
1819
+ when (runningStatefulTask .changelogOffsets ())
1820
+ .thenReturn (mkMap (mkEntry (t1p0changelog , changelogOffsetOfRunningTask )));
1821
+ final TasksRegistry tasks = mock (TasksRegistry .class );
1822
+ final TaskManager taskManager = setUpTaskManager (ProcessingMode .AT_LEAST_ONCE , tasks );
1823
+ when (tasks .allTasksPerId ()).thenReturn (mkMap (mkEntry (taskId00 , runningStatefulTask )));
1862
1824
1863
- computeOffsetSumAndVerify (changelogOffsets , expectedOffsetSums );
1825
+ assertThat (
1826
+ taskManager .taskOffsetSums (),
1827
+ is (mkMap (mkEntry (taskId00 , changelogOffsetOfRunningTask )))
1828
+ );
1864
1829
}
1865
1830
1866
1831
@ Test
@@ -1911,14 +1876,14 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro
1911
1876
}
1912
1877
1913
1878
@ Test
1914
- public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater () {
1879
+ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask () {
1915
1880
final StreamTask runningStatefulTask = statefulTask (taskId00 , taskId00ChangelogPartitions )
1916
1881
.inState (State .RUNNING ).build ();
1917
1882
final StreamTask restoringStatefulTask = statefulTask (taskId01 , taskId01ChangelogPartitions )
1918
1883
.inState (State .RESTORING ).build ();
1919
1884
final StandbyTask restoringStandbyTask = standbyTask (taskId02 , taskId02ChangelogPartitions )
1920
1885
.inState (State .RUNNING ).build ();
1921
- final long changelogOffsetOfRunningTask = 42L ;
1886
+ final long changelogOffsetOfRunningTask = Task . LATEST_OFFSET ;
1922
1887
final long changelogOffsetOfRestoringStatefulTask = 24L ;
1923
1888
final long changelogOffsetOfRestoringStandbyTask = 84L ;
1924
1889
when (runningStatefulTask .changelogOffsets ())
@@ -1943,14 +1908,26 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat
1943
1908
}
1944
1909
1945
1910
@ Test
1946
- public void shouldSkipUnknownOffsetsWhenComputingOffsetSum () throws Exception {
1947
- final Map <TopicPartition , Long > changelogOffsets = mkMap (
1948
- mkEntry (new TopicPartition ("changelog" , 0 ), OffsetCheckpoint .OFFSET_UNKNOWN ),
1949
- mkEntry (new TopicPartition ("changelog" , 1 ), 10L )
1950
- );
1951
- final Map <TaskId , Long > expectedOffsetSums = mkMap (mkEntry (taskId00 , 10L ));
1911
+ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum () {
1912
+ final StreamTask restoringStatefulTask = statefulTask (taskId01 , taskId01ChangelogPartitions )
1913
+ .inState (State .RESTORING ).build ();
1914
+ final long changelogOffsetOfRestoringStandbyTask = 84L ;
1915
+ when (restoringStatefulTask .changelogOffsets ())
1916
+ .thenReturn (mkMap (
1917
+ mkEntry (t1p1changelog , changelogOffsetOfRestoringStandbyTask ),
1918
+ mkEntry (t1p1changelog2 , OffsetCheckpoint .OFFSET_UNKNOWN )
1919
+ ));
1920
+ final TasksRegistry tasks = mock (TasksRegistry .class );
1921
+ final TaskManager taskManager = setUpTaskManager (ProcessingMode .AT_LEAST_ONCE , tasks );
1922
+ when (tasks .allTasksPerId ()).thenReturn (mkMap (mkEntry (taskId01 , restoringStatefulTask )));
1923
+ when (stateUpdater .tasks ()).thenReturn (Set .of (restoringStatefulTask ));
1952
1924
1953
- computeOffsetSumAndVerify (changelogOffsets , expectedOffsetSums );
1925
+ assertThat (
1926
+ taskManager .taskOffsetSums (),
1927
+ is (mkMap (
1928
+ mkEntry (taskId01 , changelogOffsetOfRestoringStandbyTask )
1929
+ ))
1930
+ );
1954
1931
}
1955
1932
1956
1933
private void computeOffsetSumAndVerify (final Map <TopicPartition , Long > changelogOffsets ,
0 commit comments