diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConfig.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConfig.java index 1f0adfa25be..2e30c78ffdb 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConfig.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConfig.java @@ -43,7 +43,9 @@ public class DistroConfig extends AbstractDynamicConfig { private long loadDataRetryDelayMillis = DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS; private long loadDataTimeoutMillis = DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS; - + + private int verifyBatchSize = DistroConstants.DEFAULT_DATA_VERIFY_BATCH_SIZE; + private DistroConfig() { super(DISTRO); resetConfig(); @@ -65,6 +67,8 @@ protected void getConfigFromEnv() { DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS); loadDataTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_LOAD_TIMEOUT_MILLISECONDS, Long.class, DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS); + verifyBatchSize = EnvUtil.getProperty(DistroConstants.DATA_VERIFY_BATCH_SIZE, Integer.class, + DistroConstants.DEFAULT_DATA_VERIFY_BATCH_SIZE); } public static DistroConfig getInstance() { @@ -126,12 +130,20 @@ public long getLoadDataTimeoutMillis() { public void setLoadDataTimeoutMillis(long loadDataTimeoutMillis) { this.loadDataTimeoutMillis = loadDataTimeoutMillis; } - + + public int getVerifyBatchSize() { + return verifyBatchSize; + } + + public void setVerifyBatchSize(int verifyBatchSize) { + this.verifyBatchSize = verifyBatchSize; + } + @Override protected String printConfig() { return "DistroConfig{" + "syncDelayMillis=" + syncDelayMillis + ", syncTimeoutMillis=" + syncTimeoutMillis + ", syncRetryDelayMillis=" + syncRetryDelayMillis + ", verifyIntervalMillis=" + verifyIntervalMillis + ", verifyTimeoutMillis=" + verifyTimeoutMillis + ", loadDataRetryDelayMillis=" + loadDataRetryDelayMillis - + ", loadDataTimeoutMillis=" + loadDataTimeoutMillis + '}'; + + ", loadDataTimeoutMillis=" + loadDataTimeoutMillis + ", verifyBatchSize=" + verifyBatchSize + '}'; } } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConstants.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConstants.java index fca6572e2f9..0251d24eec6 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConstants.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/DistroConstants.java @@ -66,5 +66,9 @@ public class DistroConstants { public static final String DATA_LOAD_TIMEOUT_MILLISECONDS_STATE = "data_load_timeoutMs"; public static final long DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS = 30000L; + + public static final String DATA_VERIFY_BATCH_SIZE = "data_verify_batch_size"; + + public static final int DEFAULT_DATA_VERIFY_BATCH_SIZE = 100; } diff --git a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTask.java b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTask.java index 6ca0cc79b58..5152a0d7aa2 100644 --- a/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTask.java +++ b/core/src/main/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTask.java @@ -17,14 +17,18 @@ package com.alibaba.nacos.core.distributed.distro.task.verify; import com.alibaba.nacos.common.task.AbstractExecuteTask; +import com.alibaba.nacos.common.utils.RandomUtils; import com.alibaba.nacos.core.distributed.distro.component.DistroCallback; import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent; import com.alibaba.nacos.core.distributed.distro.entity.DistroData; import com.alibaba.nacos.core.distributed.distro.monitor.DistroRecord; import com.alibaba.nacos.core.distributed.distro.monitor.DistroRecordsHolder; +import com.alibaba.nacos.core.utils.GlobalExecutor; import com.alibaba.nacos.core.utils.Loggers; +import com.alibaba.nacos.core.distributed.distro.DistroConfig; import java.util.List; +import java.util.concurrent.TimeUnit; /** * Execute distro verify task. @@ -48,23 +52,35 @@ public DistroVerifyExecuteTask(DistroTransportAgent transportAgent, List batch = verifyData.subList(i, end); + requestBatch(batch); + } + } + + void requestBatch(List batch) { + GlobalExecutor.DISTRO_VERIFY_EXECUTOR.schedule((() -> { + for (DistroData each : batch) { + try { + if (transportAgent.supportCallbackTransport()) { + doSyncVerifyDataWithCallback(each); + } else { + doSyncVerifyData(each); + } + } catch (Throwable e) { + Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, + targetServer, e); } - } catch (Exception e) { - Loggers.DISTRO - .error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e); } - } + }), RandomUtils.nextLong(0, DistroConfig.getInstance().getVerifyIntervalMillis()), TimeUnit.MILLISECONDS); } - + private void doSyncVerifyDataWithCallback(DistroData data) { transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback()); } diff --git a/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java b/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java index 7559b326ccb..a0d9e7d630e 100644 --- a/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java +++ b/core/src/main/java/com/alibaba/nacos/core/utils/GlobalExecutor.java @@ -53,6 +53,10 @@ public class GlobalExecutor { EnvUtil.getAvailableProcessors(RemoteUtils.getRemoteExecutorTimesOfProcessors()), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(RemoteUtils.getRemoteExecutorQueueSize()), new ThreadFactoryBuilder().daemon(true).nameFormat("nacos-cluster-grpc-executor-%d").build()); + + public static final ScheduledExecutorService DISTRO_VERIFY_EXECUTOR = ExecutorFactory.Managed + .newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4, + new NameThreadFactory("com.alibaba.nacos.core.distro.verify")); public static void runWithoutThread(Runnable runnable) { runnable.run(); diff --git a/core/src/test/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTaskTest.java b/core/src/test/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTaskTest.java new file mode 100644 index 00000000000..1f76917c355 --- /dev/null +++ b/core/src/test/java/com/alibaba/nacos/core/distributed/distro/task/verify/DistroVerifyExecuteTaskTest.java @@ -0,0 +1,102 @@ +/* + * Copyright 1999-2018 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.nacos.core.distributed.distro.task.verify; + +import com.alibaba.nacos.core.distributed.distro.DistroConfig; +import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent; +import com.alibaba.nacos.core.distributed.distro.entity.DistroData; +import com.alibaba.nacos.core.distributed.distro.entity.DistroKey; +import com.alibaba.nacos.sys.env.EnvUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.MockitoAnnotations; +import org.springframework.mock.env.MockEnvironment; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; + +class DistroVerifyExecuteTaskTest { + + @InjectMocks + private DistroVerifyExecuteTask distroVerifyExecuteTask; + + @Mock + private DistroTransportAgent transportAgent; + + @Mock + private DistroConfig distroConfig; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + EnvUtil.setEnvironment(new MockEnvironment()); + // 初始化测试数据 + List verifyData = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + DistroKey key = new DistroKey("testKey" + i, "testResource"); + DistroData data = new DistroData(key, ("data" + i).getBytes()); + verifyData.add(data); + } + + // 创建被测任务实例 + distroVerifyExecuteTask = new DistroVerifyExecuteTask( + transportAgent, verifyData, "targetServer", "testResource" + ); + } + + @Test + void testRunWithSmallBatchSize() throws InterruptedException { + // 准备测试数据 + when(distroConfig.getVerifyBatchSize()).thenReturn(2); + when(distroConfig.getVerifyIntervalMillis()).thenReturn(50L); + try (MockedStatic mockedStatic = mockStatic(DistroConfig.class)) { + mockedStatic.when(DistroConfig::getInstance).thenReturn(distroConfig); + + // 执行测试 + distroVerifyExecuteTask.run(); + + // 验证全部被调用 + Thread.sleep(100L); + verify(transportAgent, times(10)).syncVerifyData(any(), any()); + } + } + + @Test + void testRunWithLargeBatchSize() throws InterruptedException { + // 准备测试数据 + when(distroConfig.getVerifyBatchSize()).thenReturn(20); + when(distroConfig.getVerifyIntervalMillis()).thenReturn(50L); + try (MockedStatic mockedStatic = mockStatic(DistroConfig.class)) { + mockedStatic.when(DistroConfig::getInstance).thenReturn(distroConfig); + + // 执行测试 + distroVerifyExecuteTask.run(); + // 验证全部被调用 + Thread.sleep(100L); + verify(transportAgent, times(10)).syncVerifyData(any(), any()); + } + } +}