diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java index 8481efa6289..84c21ad96ab 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolder.java @@ -58,7 +58,7 @@ public class ServiceInfoHolder implements Closeable { private String notifierEventScope; private boolean enableClientMetrics = true; - + public ServiceInfoHolder(String namespace, String notifierEventScope, NacosClientProperties properties) { cacheDir = CacheDirUtil.initCacheDir(namespace, properties); instancesDiffer = new InstancesDiffer(); @@ -109,10 +109,10 @@ public ServiceInfo getServiceInfo(final String serviceName, final String groupNa * @param json service json * @return service info */ - public ServiceInfo processServiceInfo(String json) { + public ServiceInfo processServiceInfo(String json, boolean forceNotify) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); serviceInfo.setJsonFromServer(json); - return processServiceInfo(serviceInfo); + return processServiceInfo(serviceInfo, forceNotify); } /** @@ -121,7 +121,7 @@ public ServiceInfo processServiceInfo(String json) { * @param serviceInfo new service info * @return service info */ - public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { + public ServiceInfo processServiceInfo(ServiceInfo serviceInfo, boolean forceNotify) { String serviceKey = serviceInfo.getKeyWithoutClusters(); if (serviceKey == null) { NAMING_LOGGER.warn("process service info but serviceKey is null, service host: {}", @@ -133,6 +133,12 @@ public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { //empty or error push, just ignore NAMING_LOGGER.warn("process service info but found empty or error push, serviceKey: {}, " + "pushEmptyProtection: {}, hosts: {}", serviceKey, pushEmptyProtection, serviceInfo.getHosts()); + if (forceNotify) { + InstancesDiff diff = getServiceInfoDiff(null, oldService); + NotifyCenter.publishEvent( + new InstancesChangeEvent(notifierEventScope, oldService.getName(), oldService.getGroupName(), + oldService.getClusters(), oldService.getHosts(), diff)); + } return oldService; } serviceInfoMap.put(serviceKey, serviceInfo); @@ -140,7 +146,7 @@ public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) { serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo)); } - + if (enableClientMetrics) { try { MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); @@ -148,8 +154,8 @@ public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) { NAMING_LOGGER.error("Failed to update metrics for service info map size", t); } } - - if (diff.hasDifferent()) { + + if (diff.hasDifferent() || forceNotify) { NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceKey, JacksonUtils.toJson(serviceInfo.getHosts())); diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java b/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java index 107b53693be..68d55546f88 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/core/ServiceInfoUpdateService.java @@ -193,7 +193,7 @@ public void run() { ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey); if (serviceObj == null) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, false); - serviceInfoHolder.processServiceInfo(serviceObj); + serviceInfoHolder.processServiceInfo(serviceObj, false); // TODO multiple time can be configured. delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE; lastRefTime = serviceObj.getLastRefTime(); @@ -202,7 +202,7 @@ public void run() { if (serviceObj.getLastRefTime() <= lastRefTime) { serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, false); - serviceInfoHolder.processServiceInfo(serviceObj); + serviceInfoHolder.processServiceInfo(serviceObj, false); } lastRefTime = serviceObj.getLastRefTime(); if (CollectionUtils.isEmpty(serviceObj.getHosts())) { diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java index 28bd711ee2f..14a21736ee3 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegate.java @@ -172,7 +172,7 @@ public ServiceInfo subscribe(String serviceName, String groupName, String cluste if (null == result || !isSubscribed(serviceName, groupName, clusters)) { result = grpcClientProxy.subscribe(serviceName, groupName, clusters); } - serviceInfoHolder.processServiceInfo(result); + serviceInfoHolder.processServiceInfo(result, true); return result; } diff --git a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java index 83c828c802d..776eba0d182 100644 --- a/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java +++ b/client/src/main/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandler.java @@ -41,7 +41,7 @@ public NamingPushRequestHandler(ServiceInfoHolder serviceInfoHolder) { public Response requestReply(Request request, Connection connection) { if (request instanceof NotifySubscriberRequest) { NotifySubscriberRequest notifyRequest = (NotifySubscriberRequest) request; - serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo()); + serviceInfoHolder.processServiceInfo(notifyRequest.getServiceInfo(), false); return new NotifySubscriberResponse(); } return null; diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java index bb3fc6ca3a5..045287f2027 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/cache/ServiceInfoHolderTest.java @@ -85,7 +85,7 @@ void testProcessServiceInfo() { hosts.add(instance2); info.setHosts(hosts); - ServiceInfo actual1 = holder.processServiceInfo(info); + ServiceInfo actual1 = holder.processServiceInfo(info, false); assertEquals(info, actual1); Instance newInstance1 = createInstance("1.1.1.1", 1); @@ -97,7 +97,7 @@ void testProcessServiceInfo() { ServiceInfo info2 = new ServiceInfo("a@@b@@c"); info2.setHosts(hosts2); - ServiceInfo actual2 = holder.processServiceInfo(info2); + ServiceInfo actual2 = holder.processServiceInfo(info2, false); assertEquals(info2, actual2); } @@ -116,7 +116,7 @@ void testProcessServiceInfoEnableClientMetricsTrue() { try (MockedStatic mockedMetricsMonitor = Mockito.mockStatic(MetricsMonitor.class)) { mockedMetricsMonitor.when(MetricsMonitor::getServiceInfoMapSizeMonitor).thenReturn(mockGaugeChild); - holder.processServiceInfo(info); + holder.processServiceInfo(info, false); verify(mockGaugeChild, times(1)).set(1); } @@ -134,7 +134,7 @@ void testProcessServiceInfoEnableClientMetricsFalse() { info.setHosts(hosts); try (MockedStatic mockedMetricsMonitor = Mockito.mockStatic(MetricsMonitor.class)) { - holder.processServiceInfo(info); + holder.processServiceInfo(info, false); mockedMetricsMonitor.verify(MetricsMonitor::getServiceInfoMapSizeMonitor, never()); } @@ -156,7 +156,7 @@ void testProcessServiceInfoEnableClientMetricsNotSet() { try (MockedStatic mockedMetricsMonitor = Mockito.mockStatic(MetricsMonitor.class)) { mockedMetricsMonitor.when(MetricsMonitor::getServiceInfoMapSizeMonitor).thenReturn(mockGaugeChild); - holder.processServiceInfo(info); + holder.processServiceInfo(info, false); verify(mockGaugeChild, times(1)).set(1); } @@ -180,7 +180,7 @@ void testProcessServiceInfoSetThrowsException() { mockedMetricsMonitor.when(MetricsMonitor::getServiceInfoMapSizeMonitor).thenReturn(mockGaugeChild); doThrow(exception).when(mockGaugeChild).set(anyInt()); - ServiceInfo actual2 = holder.processServiceInfo(info); + ServiceInfo actual2 = holder.processServiceInfo(info, false); assertEquals(info, actual2); } @@ -208,7 +208,7 @@ private Instance createInstance(String ip, int port) { void testProcessServiceInfo2() { String json = "{\"groupName\":\"a\",\"name\":\"b\",\"clusters\":\"c\"}"; - ServiceInfo actual = holder.processServiceInfo(json); + ServiceInfo actual = holder.processServiceInfo(json, false); ServiceInfo expect = new ServiceInfo("a@@b@@c"); expect.setJsonFromServer(json); assertEquals(expect.getKey(), actual.getKey()); @@ -227,11 +227,11 @@ void testProcessServiceInfoWithPushEmpty() throws NacosException { nacosClientProperties.setProperty(PropertyKeyConst.NAMING_PUSH_EMPTY_PROTECTION, "true"); holder.shutdown(); holder = new ServiceInfoHolder("aa", "scope-001", nacosClientProperties); - holder.processServiceInfo(oldInfo); + holder.processServiceInfo(oldInfo, false); ServiceInfo newInfo = new ServiceInfo("a@@b@@c"); - final ServiceInfo actual = holder.processServiceInfo(newInfo); + final ServiceInfo actual = holder.processServiceInfo(newInfo, false); assertEquals(oldInfo.getKey(), actual.getKey()); assertEquals(2, actual.getHosts().size()); @@ -239,7 +239,7 @@ void testProcessServiceInfoWithPushEmpty() throws NacosException { @Test void testProcessNullServiceInfo() { - assertNull(holder.processServiceInfo(new ServiceInfo())); + assertNull(holder.processServiceInfo(new ServiceInfo(), false)); } @Test @@ -252,10 +252,10 @@ void testProcessServiceInfoForOlder() { hosts.add(instance2); info.setHosts(hosts); info.setLastRefTime(System.currentTimeMillis()); - holder.processServiceInfo(info); + holder.processServiceInfo(info, false); ServiceInfo olderInfo = new ServiceInfo("a@@b@@c"); olderInfo.setLastRefTime(0L); - final ServiceInfo actual = holder.processServiceInfo(olderInfo); + final ServiceInfo actual = holder.processServiceInfo(olderInfo, false); assertEquals(olderInfo, actual); } @@ -267,7 +267,7 @@ void testGetServiceInfo() { hosts.add(instance1); info.setHosts(hosts); - ServiceInfo expect = holder.processServiceInfo(info); + ServiceInfo expect = holder.processServiceInfo(info, false); String serviceName = "b"; String groupName = "a"; ServiceInfo actual = holder.getServiceInfo(serviceName, groupName); diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegateTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegateTest.java index c2caa3d48ab..dbf138f5994 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegateTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/NamingClientProxyDelegateTest.java @@ -298,7 +298,7 @@ void testSubscribe() throws NacosException { ServiceInfo actual = delegate.subscribe(serviceName, groupName, clusters); assertEquals(info, actual); verify(mockGrpcClient, times(1)).subscribe(serviceName, groupName, clusters); - verify(holder, times(1)).processServiceInfo(info); + verify(holder, times(1)).processServiceInfo(info, false); } diff --git a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java index 40855f67ae6..34bcd7e6a44 100644 --- a/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java +++ b/client/src/test/java/com/alibaba/nacos/client/naming/remote/gprc/NamingPushRequestHandlerTest.java @@ -49,7 +49,7 @@ void testRequestReply() { Response response = handler.requestReply(req, new TestConnection(new RpcClient.ServerInfo())); //then assertTrue(response instanceof NotifySubscriberResponse); - verify(holder, times(1)).processServiceInfo(info); + verify(holder, times(1)).processServiceInfo(info, false); } @Test