Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class DistroConfig extends AbstractDynamicConfig {
private long loadDataRetryDelayMillis = DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS;

private long loadDataTimeoutMillis = DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS;


private int verifyBatchSize = DistroConstants.DEFAULT_DATA_VERIFY_BATCH_SIZE;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新增的verifyBatchSize字段缺少配置说明注释

🟡 Major | 🧹 Code Smells

📋 问题详情

在DistroConfig类中新增的verifyBatchSize字段未添加配置说明注释,导致其他开发者无法明确该配置项的用途和合理取值范围。根据代码标准化规范,所有配置项应包含清晰的注释说明其业务含义和默认值。

💡 解决方案

在字段前添加配置说明注释:

+    /**
+     * 验证任务数据分批次处理的批次大小,默认值为100
+     * 该参数用于控制每次网络验证请求的数据量,避免单次请求过大导致内存溢出
+     */
🔧 建议代码

‼️AI 生成代码 - 请在应用前检查逻辑、规范并测试

Suggested change
private int verifyBatchSize = DistroConstants.DEFAULT_DATA_VERIFY_BATCH_SIZE;
/**
* 验证任务数据分批次处理的批次大小默认值为100
* 该参数用于控制每次网络验证请求的数据量避免单次请求过大导致内存溢出
*/
private int verifyBatchSize = DistroConstants.DEFAULT_DATA_VERIFY_BATCH_SIZE;

您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)

有用意见👍无用意见👎错误意见❌


private DistroConfig() {
super(DISTRO);
resetConfig();
Expand All @@ -65,6 +67,8 @@ protected void getConfigFromEnv() {
DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS);
loadDataTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_LOAD_TIMEOUT_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS);
verifyBatchSize = EnvUtil.getProperty(DistroConstants.DATA_VERIFY_BATCH_SIZE, Integer.class,
DistroConstants.DEFAULT_DATA_VERIFY_BATCH_SIZE);
}

public static DistroConfig getInstance() {
Expand Down Expand Up @@ -126,12 +130,20 @@ public long getLoadDataTimeoutMillis() {
public void setLoadDataTimeoutMillis(long loadDataTimeoutMillis) {
this.loadDataTimeoutMillis = loadDataTimeoutMillis;
}


public int getVerifyBatchSize() {
return verifyBatchSize;
}

public void setVerifyBatchSize(int verifyBatchSize) {
this.verifyBatchSize = verifyBatchSize;
}

@Override
protected String printConfig() {
return "DistroConfig{" + "syncDelayMillis=" + syncDelayMillis + ", syncTimeoutMillis=" + syncTimeoutMillis
+ ", syncRetryDelayMillis=" + syncRetryDelayMillis + ", verifyIntervalMillis=" + verifyIntervalMillis
+ ", verifyTimeoutMillis=" + verifyTimeoutMillis + ", loadDataRetryDelayMillis=" + loadDataRetryDelayMillis
+ ", loadDataTimeoutMillis=" + loadDataTimeoutMillis + '}';
+ ", loadDataTimeoutMillis=" + loadDataTimeoutMillis + ", verifyBatchSize=" + verifyBatchSize + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,9 @@ public class DistroConstants {
public static final String DATA_LOAD_TIMEOUT_MILLISECONDS_STATE = "data_load_timeoutMs";

public static final long DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS = 30000L;

public static final String DATA_VERIFY_BATCH_SIZE = "data_verify_batch_size";

public static final int DEFAULT_DATA_VERIFY_BATCH_SIZE = 100;

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package com.alibaba.nacos.core.distributed.distro.task.verify;

import com.alibaba.nacos.common.task.AbstractExecuteTask;
import com.alibaba.nacos.common.utils.RandomUtils;
import com.alibaba.nacos.core.distributed.distro.component.DistroCallback;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.monitor.DistroRecord;
import com.alibaba.nacos.core.distributed.distro.monitor.DistroRecordsHolder;
import com.alibaba.nacos.core.utils.GlobalExecutor;
import com.alibaba.nacos.core.utils.Loggers;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Execute distro verify task.
Expand All @@ -48,23 +52,35 @@ public DistroVerifyExecuteTask(DistroTransportAgent transportAgent, List<DistroD
this.targetServer = targetServer;
this.resourceType = resourceType;
}

@Override
public void run() {
for (DistroData each : verifyData) {
try {
if (transportAgent.supportCallbackTransport()) {
doSyncVerifyDataWithCallback(each);
} else {
doSyncVerifyData(each);
int batchSize = DistroConfig.getInstance().getVerifyBatchSize();
int totalSize = verifyData.size();
for (int i = 0; i < totalSize; i += batchSize) {
int end = Math.min(i + batchSize, totalSize);
List<DistroData> batch = verifyData.subList(i, end);
requestBatch(batch);
}
}

void requestBatch(List<DistroData> batch) {
GlobalExecutor.DISTRO_VERIFY_EXECUTOR.schedule((() -> {
for (DistroData each : batch) {
try {
if (transportAgent.supportCallbackTransport()) {
doSyncVerifyDataWithCallback(each);
} else {
doSyncVerifyData(each);
}
} catch (Throwable e) {
Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType,
targetServer, e);
}
} catch (Exception e) {
Loggers.DISTRO
.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);
}
}
}), RandomUtils.nextLong(0, DistroConfig.getInstance().getVerifyIntervalMillis()), TimeUnit.MILLISECONDS);
}

private void doSyncVerifyDataWithCallback(DistroData data) {
transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public class GlobalExecutor {
EnvUtil.getAvailableProcessors(RemoteUtils.getRemoteExecutorTimesOfProcessors()), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(RemoteUtils.getRemoteExecutorQueueSize()),
new ThreadFactoryBuilder().daemon(true).nameFormat("nacos-cluster-grpc-executor-%d").build());

public static final ScheduledExecutorService DISTRO_VERIFY_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4,
new NameThreadFactory("com.alibaba.nacos.core.distro.verify"));
Comment on lines +57 to +59
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新增的DISTRO_VERIFY_EXECUTOR线程池未配置核心线程保活策略

🟡 Major | 📝 Performance optimization

📋 问题详情

新创建的DISTRO_VERIFY_EXECUTOR线程池未设置核心线程保活时间,可能导致线程频繁创建/销毁。根据性能优化规范,应为ScheduledExecutorService配置合理的keepAliveTime参数以提升线程复用效率。

💡 解决方案

添加keepAliveTime参数配置:

-            .newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4,
+            .newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4, 60L, TimeUnit.SECONDS,
🔧 建议代码

‼️AI 生成代码 - 请在应用前检查逻辑、规范并测试

Suggested change
public static final ScheduledExecutorService DISTRO_VERIFY_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4,
new NameThreadFactory("com.alibaba.nacos.core.distro.verify"));
public static final ScheduledExecutorService DISTRO_VERIFY_EXECUTOR = ExecutorFactory.Managed
.newScheduledExecutorService(ClassUtils.getCanonicalName(GlobalExecutor.class), 4, 60L, TimeUnit.SECONDS,
new NameThreadFactory("com.alibaba.nacos.core.distro.verify"));

您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)

有用意见👍无用意见👎错误意见❌


public static void runWithoutThread(Runnable runnable) {
runnable.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.alibaba.nacos.core.distributed.distro.task.verify;

import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import com.alibaba.nacos.core.distributed.distro.component.DistroTransportAgent;
import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
import com.alibaba.nacos.sys.env.EnvUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.MockitoAnnotations;
import org.springframework.mock.env.MockEnvironment;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;

class DistroVerifyExecuteTaskTest {

@InjectMocks
private DistroVerifyExecuteTask distroVerifyExecuteTask;

@Mock
private DistroTransportAgent transportAgent;

@Mock
private DistroConfig distroConfig;

@BeforeEach
void setUp() {
MockitoAnnotations.openMocks(this);
EnvUtil.setEnvironment(new MockEnvironment());
// 初始化测试数据
List<DistroData> verifyData = new ArrayList<>();
for (int i = 0; i < 10; i++) {
DistroKey key = new DistroKey("testKey" + i, "testResource");
DistroData data = new DistroData(key, ("data" + i).getBytes());
verifyData.add(data);
}

// 创建被测任务实例
distroVerifyExecuteTask = new DistroVerifyExecuteTask(
transportAgent, verifyData, "targetServer", "testResource"
);
}

@Test
void testRunWithSmallBatchSize() throws InterruptedException {
// 准备测试数据
when(distroConfig.getVerifyBatchSize()).thenReturn(2);
when(distroConfig.getVerifyIntervalMillis()).thenReturn(50L);
try (MockedStatic<DistroConfig> mockedStatic = mockStatic(DistroConfig.class)) {
mockedStatic.when(DistroConfig::getInstance).thenReturn(distroConfig);

// 执行测试
distroVerifyExecuteTask.run();

// 验证全部被调用
Thread.sleep(100L);
verify(transportAgent, times(10)).syncVerifyData(any(), any());
}
}
Comment on lines +70 to +85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

单元测试未验证配置边界条件

🟠 Critical | 📝 Unit test coverage

📋 问题详情

测试用例未覆盖verifyBatchSize=0的异常配置场景。根据单元测试规范,应添加验证配置有效性(如最小值检查)的测试用例,确保系统在非法配置下能正确处理。

💡 解决方案

添加边界值测试用例:

+    @Test(expected = IllegalArgumentException.class)
+    void testInvalidBatchSize() {
+        when(distroConfig.getVerifyBatchSize()).thenReturn(0);
+        mockedStatic.when(DistroConfig::getInstance).thenReturn(distroConfig);
+        distroVerifyExecuteTask.run();
+    }

您的反馈对我们很重要!(建议右键在新标签页中打开以下链接)

有用意见👍无用意见👎错误意见❌


@Test
void testRunWithLargeBatchSize() throws InterruptedException {
// 准备测试数据
when(distroConfig.getVerifyBatchSize()).thenReturn(20);
when(distroConfig.getVerifyIntervalMillis()).thenReturn(50L);
try (MockedStatic<DistroConfig> mockedStatic = mockStatic(DistroConfig.class)) {
mockedStatic.when(DistroConfig::getInstance).thenReturn(distroConfig);

// 执行测试
distroVerifyExecuteTask.run();
// 验证全部被调用
Thread.sleep(100L);
verify(transportAgent, times(10)).syncVerifyData(any(), any());
}
}
}
Loading