Skip to content

Commit 2d7497d

Browse files
committed
MINOR: Revert "KAFKA-18913: Start state updater in task manager (apache#19889)"
This reverts commit 4d6cf3e. It seemed to trigger a race condition in the state updater initialization.
1 parent 05f012c commit 2d7497d

File tree

3 files changed

+11
-43
lines changed

3 files changed

+11
-43
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
454454
final DefaultTaskManager schedulingTaskManager =
455455
maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
456456
final StateUpdater stateUpdater =
457-
maybeCreateStateUpdater(
457+
maybeCreateAndStartStateUpdater(
458458
stateUpdaterEnabled,
459459
streamsMetrics,
460460
config,
@@ -633,7 +633,7 @@ private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean
633633
return null;
634634
}
635635

636-
private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEnabled,
636+
private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled,
637637
final StreamsMetricsImpl streamsMetrics,
638638
final StreamsConfig streamsConfig,
639639
final Consumer<byte[], byte[]> restoreConsumer,
@@ -644,7 +644,7 @@ private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEn
644644
final int threadIdx) {
645645
if (stateUpdaterEnabled) {
646646
final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
647-
return new DefaultStateUpdater(
647+
final StateUpdater stateUpdater = new DefaultStateUpdater(
648648
name,
649649
streamsMetrics.metricsRegistry(),
650650
streamsConfig,
@@ -653,6 +653,8 @@ private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEn
653653
topologyMetadata,
654654
time
655655
);
656+
stateUpdater.start();
657+
return stateUpdater;
656658
} else {
657659
return null;
658660
}
@@ -881,9 +883,6 @@ public void run() {
881883
}
882884
boolean cleanRun = false;
883885
try {
884-
if (stateUpdaterEnabled) {
885-
taskManager.init();
886-
}
887886
cleanRun = runLoop();
888887
} catch (final Throwable e) {
889888
failedStreamThreadSensor.record();

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,6 @@ public class TaskManager {
149149
);
150150
}
151151

152-
void init() {
153-
if (stateUpdater != null) {
154-
this.stateUpdater.start();
155-
}
156-
}
157152
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
158153
this.mainConsumer = mainConsumer;
159154
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@
109109
import org.junit.jupiter.params.ParameterizedTest;
110110
import org.junit.jupiter.params.provider.Arguments;
111111
import org.junit.jupiter.params.provider.MethodSource;
112-
import org.junit.jupiter.params.provider.ValueSource;
113112
import org.mockito.InOrder;
114113
import org.mockito.Mock;
115114
import org.mockito.Mockito;
@@ -917,7 +916,6 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b
917916
thread = createStreamThread(CLIENT_ID, config);
918917

919918
thread.setState(StreamThread.State.STARTING);
920-
thread.taskManager().init();
921919
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
922920

923921
final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -1291,7 +1289,6 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
12911289
thread = createStreamThread(CLIENT_ID, new StreamsConfig(props));
12921290

12931291
thread.setState(StreamThread.State.STARTING);
1294-
thread.taskManager().init();
12951292
thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
12961293

12971294
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1549,7 +1546,6 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr
15491546
consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null)));
15501547

15511548
thread.setState(StreamThread.State.STARTING);
1552-
thread.taskManager().init();
15531549
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
15541550

15551551
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1615,7 +1611,6 @@ private void testThrowingDurringCommitTransactionException(final RuntimeExceptio
16151611
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
16161612

16171613
thread.setState(StreamThread.State.STARTING);
1618-
thread.taskManager().init();
16191614
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
16201615

16211616
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1698,7 +1693,6 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
16981693
internalTopologyBuilder.buildTopology();
16991694

17001695
thread.setState(StreamThread.State.STARTING);
1701-
thread.taskManager().init();
17021696
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
17031697

17041698
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1793,7 +1787,6 @@ private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenComm
17931787
consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null)));
17941788

17951789
thread.setState(StreamThread.State.STARTING);
1796-
thread.taskManager().init();
17971790
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
17981791

17991792
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1858,7 +1851,6 @@ public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterE
18581851
internalTopologyBuilder.addSink("out", "output", null, null, null, "name");
18591852

18601853
thread.setState(StreamThread.State.STARTING);
1861-
thread.taskManager().init();
18621854
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
18631855

18641856
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1940,7 +1932,6 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU
19401932
);
19411933

19421934
thread.setState(StreamThread.State.STARTING);
1943-
thread.taskManager().init();
19441935
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
19451936

19461937
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -2001,7 +1992,6 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean state
20011992
restoreConsumer.updateBeginningOffsets(offsets);
20021993

20031994
thread.setState(StreamThread.State.STARTING);
2004-
thread.taskManager().init();
20051995
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
20061996

20071997
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -2265,7 +2255,6 @@ public void process(final Record<Object, Object> record) {}
22652255
thread = createStreamThread(CLIENT_ID, config);
22662256

22672257
thread.setState(StreamThread.State.STARTING);
2268-
thread.taskManager().init();
22692258
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
22702259
final List<TopicPartition> assignedPartitions = new ArrayList<>();
22712260

@@ -2345,7 +2334,6 @@ public void process(final Record<Object, Object> record) {}
23452334
thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled);
23462335

23472336
thread.setState(StreamThread.State.STARTING);
2348-
thread.taskManager().init();
23492337
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
23502338
final List<TopicPartition> assignedPartitions = new ArrayList<>();
23512339

@@ -2543,7 +2531,6 @@ public void shouldLogAndRecordSkippedMetricForDeserializationException(final boo
25432531
thread = createStreamThread(CLIENT_ID, new StreamsConfig(properties));
25442532

25452533
thread.setState(StreamThread.State.STARTING);
2546-
thread.taskManager().init();
25472534
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
25482535

25492536
final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -3030,7 +3017,6 @@ public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps(
30303017
thread = createStreamThread(CLIENT_ID, config);
30313018

30323019
thread.setState(StreamThread.State.STARTING);
3033-
thread.taskManager().init();
30343020
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
30353021

30363022
final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -3404,7 +3390,6 @@ private void getClientInstanceId(final boolean injectTimeException, final boolea
34043390

34053391
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
34063392
thread.setState(State.STARTING);
3407-
thread.taskManager().init();
34083393

34093394
final Map<String, KafkaFuture<Uuid>> clientInstanceIdFutures = thread.clientInstanceIds(Duration.ZERO);
34103395

@@ -3429,7 +3414,6 @@ private void getClientInstanceId(final boolean injectTimeException, final boolea
34293414
public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
34303415
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
34313416
thread.setState(State.STARTING);
3432-
thread.taskManager().init();
34333417

34343418
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
34353419

@@ -3446,7 +3430,6 @@ public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolea
34463430
public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
34473431
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
34483432
thread.setState(State.STARTING);
3449-
thread.taskManager().init();
34503433

34513434
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
34523435

@@ -3463,7 +3446,6 @@ public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boo
34633446
public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
34643447
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
34653448
thread.setState(State.STARTING);
3466-
thread.taskManager().init();
34673449

34683450
final Map<String, KafkaFuture<Uuid>> producerFutures = thread.clientInstanceIds(Duration.ZERO);
34693451

@@ -3481,7 +3463,6 @@ public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateU
34813463
clientSupplier.consumer.disableTelemetry();
34823464
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
34833465
thread.setState(State.STARTING);
3484-
thread.taskManager().init();
34853466

34863467
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
34873468

@@ -3499,7 +3480,6 @@ public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean sta
34993480

35003481
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
35013482
thread.setState(State.STARTING);
3502-
thread.taskManager().init();
35033483

35043484
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
35053485

@@ -3519,7 +3499,6 @@ public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdat
35193499

35203500
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
35213501
thread.setState(State.STARTING);
3522-
thread.taskManager().init();
35233502

35243503
final Map<String, KafkaFuture<Uuid>> producerFutures = thread.clientInstanceIds(Duration.ZERO);
35253504

@@ -3537,7 +3516,6 @@ public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnab
35373516
clientSupplier.consumer.injectTimeoutException(-1);
35383517
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
35393518
thread.setState(State.STARTING);
3540-
thread.taskManager().init();
35413519

35423520
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
35433521

@@ -3562,7 +3540,6 @@ public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterE
35623540
clientSupplier.restoreConsumer.injectTimeoutException(-1);
35633541
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
35643542
thread.setState(State.STARTING);
3565-
thread.taskManager().init();
35663543

35673544
final Map<String, KafkaFuture<Uuid>> consumerFutures = thread.clientInstanceIds(Duration.ZERO);
35683545

@@ -3590,7 +3567,6 @@ public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled,
35903567

35913568
thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled);
35923569
thread.setState(State.STARTING);
3593-
thread.taskManager().init();
35943570

35953571
final Map<String, KafkaFuture<Uuid>> producerFutures = thread.clientInstanceIds(Duration.ZERO);
35963572

@@ -3607,10 +3583,9 @@ public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled,
36073583
);
36083584
}
36093585

3610-
@ParameterizedTest
3611-
@ValueSource(booleans = {true, false})
3612-
public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabled) {
3613-
final Properties props = configProps(false, stateUpdaterEnabled, false);
3586+
@Test
3587+
public void testNamedTopologyWithStreamsProtocol() {
3588+
final Properties props = configProps(false, false, false);
36143589
props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString());
36153590
final StreamsConfig config = new StreamsConfig(props);
36163591
final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder(
@@ -3667,10 +3642,9 @@ public void testStreamsRebalanceDataWithClassicProtocol() {
36673642
assertTrue(thread.streamsRebalanceData().isEmpty());
36683643
}
36693644

3670-
@ParameterizedTest
3671-
@ValueSource(booleans = {true, false})
3672-
public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpdaterEnabled) {
3673-
final Properties props = configProps(false, stateUpdaterEnabled, false);
3645+
@Test
3646+
public void testStreamsRebalanceDataWithExtraCopartition() {
3647+
final Properties props = configProps(false, false, false);
36743648
props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString());
36753649

36763650
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);

0 commit comments

Comments
 (0)