mirror of
https://github.com/apache/dubbo.git
synced 2024-10-23 07:04:37 +08:00
Enhance unit test cases (#11034)
This commit is contained in:
parent
791918c471
commit
0e95ac4e39
@ -49,6 +49,7 @@ import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.dubbo.registry.kubernetes.util.KubernetesClientConst.NAMESPACE;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@ExtendWith({MockitoExtension.class})
|
||||
class KubernetesServiceDiscoveryTest {
|
||||
@ -144,7 +145,11 @@ class KubernetesServiceDiscoveryTest {
|
||||
.endAddress().endSubset()
|
||||
.build());
|
||||
|
||||
Thread.sleep(2000);
|
||||
await().until(() -> {
|
||||
ArgumentCaptor<ServiceInstancesChangedEvent> captor = ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
|
||||
Mockito.verify(mockListener, Mockito.atLeast(0)).onEvent(captor.capture());
|
||||
return captor.getValue().getServiceInstances().size() == 2;
|
||||
});
|
||||
ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
|
||||
ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
|
||||
Mockito.verify(mockListener, Mockito.times(2)).onEvent(eventArgumentCaptor.capture());
|
||||
@ -172,7 +177,11 @@ class KubernetesServiceDiscoveryTest {
|
||||
serviceInstance = new DefaultServiceInstance(SERVICE_NAME, "Test12345", 12345, ScopeModelUtil.getApplicationModel(serviceDiscovery.getUrl().getScopeModel()));
|
||||
serviceDiscovery.doUpdate(serviceInstance);
|
||||
|
||||
Thread.sleep(2000);
|
||||
await().until(() -> {
|
||||
ArgumentCaptor<ServiceInstancesChangedEvent> captor = ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
|
||||
Mockito.verify(mockListener, Mockito.atLeast(0)).onEvent(captor.capture());
|
||||
return captor.getValue().getServiceInstances().size() == 1;
|
||||
});
|
||||
ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
|
||||
ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
|
||||
Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
|
||||
@ -205,7 +214,11 @@ class KubernetesServiceDiscoveryTest {
|
||||
.endSpec()
|
||||
.build());
|
||||
|
||||
Thread.sleep(2000);
|
||||
await().until(() -> {
|
||||
ArgumentCaptor<ServiceInstancesChangedEvent> captor = ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
|
||||
Mockito.verify(mockListener, Mockito.atLeast(0)).onEvent(captor.capture());
|
||||
return captor.getValue().getServiceInstances().size() == 1;
|
||||
});
|
||||
ArgumentCaptor<ServiceInstancesChangedEvent> eventArgumentCaptor =
|
||||
ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
|
||||
Mockito.verify(mockListener, Mockito.times(1)).onEvent(eventArgumentCaptor.capture());
|
||||
@ -230,4 +243,4 @@ class KubernetesServiceDiscoveryTest {
|
||||
|
||||
serviceDiscovery.doUnregister(serviceInstance);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -488,6 +488,14 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
void destroy() {
|
||||
cancelRetryTask();
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated only for test
|
||||
*/
|
||||
@Deprecated
|
||||
ScheduledExecutorService getRetryExecutor() {
|
||||
return retryExecutor;
|
||||
}
|
||||
}
|
||||
|
||||
private void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, List<String> urls) {
|
||||
@ -515,4 +523,19 @@ public abstract class AbstractMetadataReport implements MetadataReport {
|
||||
|
||||
protected abstract String doGetSubscribedURLs(SubscriberMetadataIdentifier subscriberMetadataIdentifier);
|
||||
|
||||
/**
|
||||
* @deprecated only for unit test
|
||||
*/
|
||||
@Deprecated
|
||||
protected ExecutorService getReportCacheExecutor() {
|
||||
return reportCacheExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated only for unit test
|
||||
*/
|
||||
@Deprecated
|
||||
protected MetadataReportRetry getMetadataReportRetry() {
|
||||
return metadataReportRetry;
|
||||
}
|
||||
}
|
||||
|
@ -40,11 +40,15 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@ -70,11 +74,11 @@ class AbstractMetadataReportTest {
|
||||
void testGetProtocol() {
|
||||
URL url = URL.valueOf("dubbo://" + NetUtils.getLocalAddress().getHostName() + ":4444/org.apache.dubbo.TestService?version=1.0.0&application=vic&side=provider");
|
||||
String protocol = abstractMetadataReport.getProtocol(url);
|
||||
assertEquals(protocol, "provider");
|
||||
assertEquals("provider", protocol);
|
||||
|
||||
URL url2 = URL.valueOf("consumer://" + NetUtils.getLocalAddress().getHostName() + ":4444/org.apache.dubbo.TestService?version=1.0.0&application=vic");
|
||||
String protocol2 = abstractMetadataReport.getProtocol(url2);
|
||||
assertEquals(protocol2, "consumer");
|
||||
assertEquals("consumer", protocol2);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -83,8 +87,11 @@ class AbstractMetadataReportTest {
|
||||
String version = "1.0.0";
|
||||
String group = null;
|
||||
String application = "vic";
|
||||
ThreadPoolExecutor reportCacheExecutor = (ThreadPoolExecutor) abstractMetadataReport.getReportCacheExecutor();
|
||||
|
||||
long completedTaskCount1 = reportCacheExecutor.getCompletedTaskCount();
|
||||
MetadataIdentifier providerMetadataIdentifier = storeProvider(abstractMetadataReport, interfaceName, version, group, application);
|
||||
Thread.sleep(1500);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount1);
|
||||
Assertions.assertNotNull(abstractMetadataReport.store.get(providerMetadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)));
|
||||
}
|
||||
|
||||
@ -111,9 +118,12 @@ class AbstractMetadataReportTest {
|
||||
String version = "1.0.0";
|
||||
String group = null;
|
||||
String application = "vic";
|
||||
MetadataIdentifier providerMetadataIdentifier = storeProvider(singleMetadataReport, interfaceName, version, group, application);
|
||||
ThreadPoolExecutor reportCacheExecutor = (ThreadPoolExecutor) singleMetadataReport.getReportCacheExecutor();
|
||||
|
||||
long completedTaskCount1 = reportCacheExecutor.getCompletedTaskCount();
|
||||
MetadataIdentifier providerMetadataIdentifier = storeProvider(singleMetadataReport, interfaceName, version, group, application);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount1);
|
||||
|
||||
Thread.sleep(2000);
|
||||
assertTrue(singleMetadataReport.file.exists());
|
||||
assertTrue(singleMetadataReport.properties.containsKey(providerMetadataIdentifier.getUniqueKey(KeyTypeEnum.UNIQUE_KEY)));
|
||||
}
|
||||
@ -134,14 +144,20 @@ class AbstractMetadataReportTest {
|
||||
assertTrue(retryReport.failedReports.isEmpty());
|
||||
|
||||
|
||||
ThreadPoolExecutor reportCacheExecutor = (ThreadPoolExecutor) retryReport.getReportCacheExecutor();
|
||||
ScheduledThreadPoolExecutor retryExecutor = (ScheduledThreadPoolExecutor) retryReport.getMetadataReportRetry().getRetryExecutor();
|
||||
|
||||
long completedTaskCount1 = reportCacheExecutor.getCompletedTaskCount();
|
||||
long completedTaskCount2 = retryExecutor.getCompletedTaskCount();
|
||||
storeProvider(retryReport, interfaceName, version, group, application);
|
||||
Thread.sleep(150);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount1);
|
||||
|
||||
assertTrue(retryReport.store.isEmpty());
|
||||
assertFalse(retryReport.failedReports.isEmpty());
|
||||
assertNotNull(retryReport.metadataReportRetry.retryScheduledFuture);
|
||||
Thread.sleep(2000L);
|
||||
assertTrue(retryReport.metadataReportRetry.retryCounter.get() != 0);
|
||||
|
||||
await().until(() -> retryExecutor.getCompletedTaskCount() > completedTaskCount2 + 2);
|
||||
assertNotEquals(0, retryReport.metadataReportRetry.retryCounter.get());
|
||||
assertTrue(retryReport.metadataReportRetry.retryCounter.get() >= 3);
|
||||
assertFalse(retryReport.store.isEmpty());
|
||||
assertTrue(retryReport.failedReports.isEmpty());
|
||||
@ -158,14 +174,15 @@ class AbstractMetadataReportTest {
|
||||
retryReport.metadataReportRetry.retryPeriod = 150L;
|
||||
retryReport.metadataReportRetry.retryTimesIfNonFail = 2;
|
||||
|
||||
ScheduledThreadPoolExecutor retryExecutor = (ScheduledThreadPoolExecutor) retryReport.getMetadataReportRetry().getRetryExecutor();
|
||||
long completedTaskCount = retryExecutor.getCompletedTaskCount();
|
||||
storeProvider(retryReport, interfaceName, version, group, application);
|
||||
|
||||
// Wait for the assignment of retryScheduledFuture to complete
|
||||
while (retryReport.metadataReportRetry.retryScheduledFuture == null) {
|
||||
}
|
||||
await().until(() -> retryReport.metadataReportRetry.retryScheduledFuture != null);
|
||||
assertFalse(retryReport.metadataReportRetry.retryScheduledFuture.isCancelled());
|
||||
assertFalse(retryReport.metadataReportRetry.retryExecutor.isShutdown());
|
||||
Thread.sleep(1000L);
|
||||
await().until(() -> retryExecutor.getCompletedTaskCount() > completedTaskCount + 2);
|
||||
assertTrue(retryReport.metadataReportRetry.retryScheduledFuture.isCancelled());
|
||||
assertTrue(retryReport.metadataReportRetry.retryExecutor.isShutdown());
|
||||
|
||||
@ -198,6 +215,7 @@ class AbstractMetadataReportTest {
|
||||
|
||||
@Test
|
||||
void testPublishAll() throws ClassNotFoundException, InterruptedException {
|
||||
ThreadPoolExecutor reportCacheExecutor = (ThreadPoolExecutor) abstractMetadataReport.getReportCacheExecutor();
|
||||
|
||||
assertTrue(abstractMetadataReport.store.isEmpty());
|
||||
assertTrue(abstractMetadataReport.allMetadataReports.isEmpty());
|
||||
@ -205,34 +223,38 @@ class AbstractMetadataReportTest {
|
||||
String version = "1.0.0";
|
||||
String group = null;
|
||||
String application = "vic";
|
||||
long completedTaskCount1 = reportCacheExecutor.getCompletedTaskCount();
|
||||
MetadataIdentifier providerMetadataIdentifier1 = storeProvider(abstractMetadataReport, interfaceName, version, group, application);
|
||||
Thread.sleep(1000);
|
||||
assertEquals(abstractMetadataReport.allMetadataReports.size(), 1);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount1);
|
||||
assertEquals(1, abstractMetadataReport.allMetadataReports.size());
|
||||
assertTrue(((FullServiceDefinition) abstractMetadataReport.allMetadataReports.get(providerMetadataIdentifier1)).getParameters().containsKey("testPKey"));
|
||||
|
||||
long completedTaskCount2 = reportCacheExecutor.getCompletedTaskCount();
|
||||
MetadataIdentifier providerMetadataIdentifier2 = storeProvider(abstractMetadataReport, interfaceName, version + "_2", group + "_2", application);
|
||||
Thread.sleep(1000);
|
||||
assertEquals(abstractMetadataReport.allMetadataReports.size(), 2);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount2);
|
||||
assertEquals(2, abstractMetadataReport.allMetadataReports.size());
|
||||
assertTrue(((FullServiceDefinition) abstractMetadataReport.allMetadataReports.get(providerMetadataIdentifier2)).getParameters().containsKey("testPKey"));
|
||||
assertEquals(((FullServiceDefinition) abstractMetadataReport.allMetadataReports.get(providerMetadataIdentifier2)).getParameters().get("version"), version + "_2");
|
||||
|
||||
Map<String, String> tmpMap = new HashMap<>();
|
||||
tmpMap.put("testKey", "value");
|
||||
long completedTaskCount3 = reportCacheExecutor.getCompletedTaskCount();
|
||||
MetadataIdentifier consumerMetadataIdentifier = storeConsumer(abstractMetadataReport, interfaceName, version + "_3", group + "_3", application, tmpMap);
|
||||
Thread.sleep(1000);
|
||||
assertEquals(abstractMetadataReport.allMetadataReports.size(), 3);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount3);
|
||||
assertEquals(3, abstractMetadataReport.allMetadataReports.size());
|
||||
|
||||
Map tmpMapResult = (Map) abstractMetadataReport.allMetadataReports.get(consumerMetadataIdentifier);
|
||||
assertEquals(tmpMapResult.get("testPKey"), "9090");
|
||||
assertEquals(tmpMapResult.get("testKey"), "value");
|
||||
assertEquals("9090", tmpMapResult.get("testPKey"));
|
||||
assertEquals("value", tmpMapResult.get("testKey"));
|
||||
assertEquals(3, abstractMetadataReport.store.size());
|
||||
|
||||
abstractMetadataReport.store.clear();
|
||||
|
||||
assertEquals(0, abstractMetadataReport.store.size());
|
||||
|
||||
long completedTaskCount4 = reportCacheExecutor.getCompletedTaskCount();
|
||||
abstractMetadataReport.publishAll();
|
||||
Thread.sleep(200);
|
||||
await().until(() -> reportCacheExecutor.getCompletedTaskCount() > completedTaskCount4);
|
||||
|
||||
assertEquals(3, abstractMetadataReport.store.size());
|
||||
|
||||
|
@ -37,6 +37,8 @@ import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
|
||||
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
|
||||
@ -51,12 +53,12 @@ import static org.apache.dubbo.monitor.Constants.MAX_CONCURRENT_KEY;
|
||||
import static org.apache.dubbo.monitor.Constants.MAX_ELAPSED_KEY;
|
||||
import static org.apache.dubbo.monitor.Constants.OUTPUT_KEY;
|
||||
import static org.apache.dubbo.monitor.Constants.SUCCESS_KEY;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.hamcrest.CoreMatchers.hasItem;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.mock;
|
||||
@ -108,22 +110,20 @@ class DubboMonitorTest {
|
||||
void testCount() throws Exception {
|
||||
DubboMonitor monitor = new DubboMonitor(monitorInvoker, monitorService);
|
||||
URL statistics = new URLBuilder(DUBBO_PROTOCOL, "10.20.153.10", 0)
|
||||
.addParameter(APPLICATION_KEY, "morgan")
|
||||
.addParameter(INTERFACE_KEY, "MemberService")
|
||||
.addParameter(METHOD_KEY, "findPerson")
|
||||
.addParameter(CONSUMER, "10.20.153.11")
|
||||
.addParameter(SUCCESS_KEY, 1)
|
||||
.addParameter(FAILURE_KEY, 0)
|
||||
.addParameter(ELAPSED_KEY, 3)
|
||||
.addParameter(MAX_ELAPSED_KEY, 3)
|
||||
.addParameter(CONCURRENT_KEY, 1)
|
||||
.addParameter(MAX_CONCURRENT_KEY, 1)
|
||||
.build();
|
||||
.addParameter(APPLICATION_KEY, "morgan")
|
||||
.addParameter(INTERFACE_KEY, "MemberService")
|
||||
.addParameter(METHOD_KEY, "findPerson")
|
||||
.addParameter(CONSUMER, "10.20.153.11")
|
||||
.addParameter(SUCCESS_KEY, 1)
|
||||
.addParameter(FAILURE_KEY, 0)
|
||||
.addParameter(ELAPSED_KEY, 3)
|
||||
.addParameter(MAX_ELAPSED_KEY, 3)
|
||||
.addParameter(CONCURRENT_KEY, 1)
|
||||
.addParameter(MAX_CONCURRENT_KEY, 1)
|
||||
.build();
|
||||
monitor.collect(statistics.toSerializableURL());
|
||||
monitor.send();
|
||||
while (lastStatistics == null) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
await().atMost(60, TimeUnit.SECONDS).until(() -> lastStatistics != null);
|
||||
Assertions.assertEquals("morgan", lastStatistics.getParameter(APPLICATION_KEY));
|
||||
Assertions.assertEquals("dubbo", lastStatistics.getProtocol());
|
||||
Assertions.assertEquals("10.20.153.10", lastStatistics.getHost());
|
||||
@ -144,17 +144,17 @@ class DubboMonitorTest {
|
||||
void testMonitorFactory() throws Exception {
|
||||
MockMonitorService monitorService = new MockMonitorService();
|
||||
URL statistics = new URLBuilder(DUBBO_PROTOCOL, "10.20.153.10", 0)
|
||||
.addParameter(APPLICATION_KEY, "morgan")
|
||||
.addParameter(INTERFACE_KEY, "MemberService")
|
||||
.addParameter(METHOD_KEY, "findPerson")
|
||||
.addParameter(CONSUMER, "10.20.153.11")
|
||||
.addParameter(SUCCESS_KEY, 1)
|
||||
.addParameter(FAILURE_KEY, 0)
|
||||
.addParameter(ELAPSED_KEY, 3)
|
||||
.addParameter(MAX_ELAPSED_KEY, 3)
|
||||
.addParameter(CONCURRENT_KEY, 1)
|
||||
.addParameter(MAX_CONCURRENT_KEY, 1)
|
||||
.build();
|
||||
.addParameter(APPLICATION_KEY, "morgan")
|
||||
.addParameter(INTERFACE_KEY, "MemberService")
|
||||
.addParameter(METHOD_KEY, "findPerson")
|
||||
.addParameter(CONSUMER, "10.20.153.11")
|
||||
.addParameter(SUCCESS_KEY, 1)
|
||||
.addParameter(FAILURE_KEY, 0)
|
||||
.addParameter(ELAPSED_KEY, 3)
|
||||
.addParameter(MAX_ELAPSED_KEY, 3)
|
||||
.addParameter(CONCURRENT_KEY, 1)
|
||||
.addParameter(MAX_CONCURRENT_KEY, 1)
|
||||
.build();
|
||||
|
||||
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
|
||||
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
|
||||
@ -171,14 +171,18 @@ class DubboMonitorTest {
|
||||
}
|
||||
try {
|
||||
monitor.collect(statistics.toSerializableURL());
|
||||
int i = 0;
|
||||
while (monitorService.getStatistics() == null && i < 200) {
|
||||
i++;
|
||||
Thread.sleep(10);
|
||||
}
|
||||
URL result = monitorService.getStatistics();
|
||||
Assertions.assertEquals(1, result.getParameter(SUCCESS_KEY, 0));
|
||||
Assertions.assertEquals(3, result.getParameter(ELAPSED_KEY, 0));
|
||||
await()
|
||||
.atLeast(10, TimeUnit.MILLISECONDS)
|
||||
.atMost(60, TimeUnit.SECONDS)
|
||||
.until(() -> monitorService.getStatistics().stream().anyMatch(s -> s.getParameter(SUCCESS_KEY, 0) == 1));
|
||||
|
||||
List<URL> statisticsUrls = monitorService.getStatistics();
|
||||
Optional<URL> url = statisticsUrls.stream()
|
||||
.filter(s -> s.getParameter(SUCCESS_KEY, 0) == 1)
|
||||
.findFirst();
|
||||
Assertions.assertTrue(url.isPresent());
|
||||
Assertions.assertEquals(1, url.get().getParameter(SUCCESS_KEY, 0));
|
||||
Assertions.assertEquals(3, url.get().getParameter(ELAPSED_KEY, 0));
|
||||
} finally {
|
||||
monitor.destroy();
|
||||
}
|
||||
@ -206,17 +210,17 @@ class DubboMonitorTest {
|
||||
@Test
|
||||
void testSum() {
|
||||
URL statistics = new URLBuilder(DUBBO_PROTOCOL, "10.20.153.11", 0)
|
||||
.addParameter(APPLICATION_KEY, "morgan")
|
||||
.addParameter(INTERFACE_KEY, "MemberService")
|
||||
.addParameter(METHOD_KEY, "findPerson")
|
||||
.addParameter(CONSUMER, "10.20.153.11")
|
||||
.addParameter(SUCCESS_KEY, 1)
|
||||
.addParameter(FAILURE_KEY, 0)
|
||||
.addParameter(ELAPSED_KEY, 3)
|
||||
.addParameter(MAX_ELAPSED_KEY, 3)
|
||||
.addParameter(CONCURRENT_KEY, 1)
|
||||
.addParameter(MAX_CONCURRENT_KEY, 1)
|
||||
.build();
|
||||
.addParameter(APPLICATION_KEY, "morgan")
|
||||
.addParameter(INTERFACE_KEY, "MemberService")
|
||||
.addParameter(METHOD_KEY, "findPerson")
|
||||
.addParameter(CONSUMER, "10.20.153.11")
|
||||
.addParameter(SUCCESS_KEY, 1)
|
||||
.addParameter(FAILURE_KEY, 0)
|
||||
.addParameter(ELAPSED_KEY, 3)
|
||||
.addParameter(MAX_ELAPSED_KEY, 3)
|
||||
.addParameter(CONCURRENT_KEY, 1)
|
||||
.addParameter(MAX_CONCURRENT_KEY, 1)
|
||||
.build();
|
||||
Invoker invoker = mock(Invoker.class);
|
||||
MonitorService monitorService = mock(MonitorService.class);
|
||||
|
||||
@ -225,7 +229,7 @@ class DubboMonitorTest {
|
||||
|
||||
dubboMonitor.collect(statistics.toSerializableURL());
|
||||
dubboMonitor.collect(statistics.addParameter(SUCCESS_KEY, 3).addParameter(CONCURRENT_KEY, 2)
|
||||
.addParameter(INPUT_KEY, 1).addParameter(OUTPUT_KEY, 2).toSerializableURL());
|
||||
.addParameter(INPUT_KEY, 1).addParameter(OUTPUT_KEY, 2).toSerializableURL());
|
||||
dubboMonitor.collect(statistics.addParameter(SUCCESS_KEY, 6).addParameter(ELAPSED_KEY, 2).toSerializableURL());
|
||||
|
||||
dubboMonitor.send();
|
||||
@ -256,6 +260,6 @@ class DubboMonitorTest {
|
||||
|
||||
dubboMonitor.lookup(queryUrl);
|
||||
|
||||
verify(monitorService).lookup(eq(queryUrl));
|
||||
verify(monitorService).lookup(queryUrl);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,26 +19,26 @@ package org.apache.dubbo.monitor.dubbo;
|
||||
import org.apache.dubbo.common.URL;
|
||||
import org.apache.dubbo.monitor.MonitorService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
* MockMonitorService
|
||||
*/
|
||||
public class MockMonitorService implements MonitorService {
|
||||
|
||||
private URL statistics;
|
||||
private List<URL> statistics = new CopyOnWriteArrayList<>();
|
||||
|
||||
public void collect(URL statistics) {
|
||||
this.statistics = statistics;
|
||||
this.statistics.add(statistics);
|
||||
}
|
||||
|
||||
public URL getStatistics() {
|
||||
public List<URL> getStatistics() {
|
||||
return statistics;
|
||||
}
|
||||
|
||||
public List<URL> lookup(URL query) {
|
||||
return Arrays.asList(statistics);
|
||||
return statistics;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -23,11 +23,16 @@ import org.apache.dubbo.config.ApplicationConfig;
|
||||
import org.apache.dubbo.registry.client.migration.model.MigrationRule;
|
||||
import org.apache.dubbo.rpc.model.ApplicationModel;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
class MigrationRuleListenerTest {
|
||||
|
||||
private String localRule = "key: demo-consumer\n" +
|
||||
@ -70,6 +75,12 @@ class MigrationRuleListenerTest {
|
||||
"force: false\n" +
|
||||
"interfaces:\n";
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() {
|
||||
ApplicationModel.reset();
|
||||
System.clearProperty("dubbo.application.migration.delay");
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener started with config center and local rule, no initial remote rule.
|
||||
* Check local rule take effect
|
||||
@ -89,21 +100,33 @@ class MigrationRuleListenerTest {
|
||||
Mockito.when(consumerURL.getServiceKey()).thenReturn("Test");
|
||||
Mockito.when(consumerURL.getParameter("timestamp")).thenReturn("1");
|
||||
|
||||
System.setProperty("dubbo.application.migration.delay", "1000");
|
||||
System.setProperty("dubbo.application.migration.delay", "1");
|
||||
MigrationRuleHandler<?> handler = Mockito.mock(MigrationRuleHandler.class, Mockito.withSettings().verboseLogging());
|
||||
|
||||
MigrationRuleListener migrationRuleListener = new MigrationRuleListener(ApplicationModel.defaultModel().getDefaultModule());
|
||||
CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
MigrationRuleListener migrationRuleListener = new MigrationRuleListener(ApplicationModel.defaultModel().getDefaultModule()) {
|
||||
@Override
|
||||
public synchronized void process(ConfigChangedEvent event) {
|
||||
try {
|
||||
countDownLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
super.process(event);
|
||||
}
|
||||
};
|
||||
|
||||
MigrationInvoker<?> migrationInvoker = Mockito.mock(MigrationInvoker.class);
|
||||
migrationRuleListener.getHandlers().put(migrationInvoker, handler);
|
||||
|
||||
Thread.sleep(2000);
|
||||
Mockito.verify(handler, Mockito.timeout(5000)).doMigrate(Mockito.any());
|
||||
countDownLatch.countDown();
|
||||
await().untilAsserted(() -> {
|
||||
Mockito.verify(handler).doMigrate(Mockito.any());
|
||||
});
|
||||
// Mockito.verify(handler, Mockito.timeout(5000)).doMigrate(Mockito.any());
|
||||
|
||||
migrationRuleListener.onRefer(null, migrationInvoker, consumerURL, null);
|
||||
Mockito.verify(handler, Mockito.times(2)).doMigrate(Mockito.any());
|
||||
|
||||
ApplicationModel.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -135,8 +158,6 @@ class MigrationRuleListenerTest {
|
||||
Assertions.assertNull(migrationRuleListener.localRuleMigrationFuture);
|
||||
Assertions.assertNull(migrationRuleListener.ruleMigrationFuture);
|
||||
Assertions.assertEquals(0, migrationRuleListener.ruleQueue.size());
|
||||
|
||||
ApplicationModel.reset();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -163,7 +184,7 @@ class MigrationRuleListenerTest {
|
||||
Mockito.when(consumerURL2.getServiceKey()).thenReturn("Test2");
|
||||
Mockito.when(consumerURL2.getParameter("timestamp")).thenReturn("2");
|
||||
|
||||
System.setProperty("dubbo.application.migration.delay", "1000");
|
||||
System.setProperty("dubbo.application.migration.delay", "10");
|
||||
MigrationRuleHandler<?> handler = Mockito.mock(MigrationRuleHandler.class, Mockito.withSettings().verboseLogging());
|
||||
MigrationRuleHandler<?> handler2 = Mockito.mock(MigrationRuleHandler.class, Mockito.withSettings().verboseLogging());
|
||||
|
||||
@ -184,12 +205,10 @@ class MigrationRuleListenerTest {
|
||||
Mockito.verify(handler, Mockito.times(1)).doMigrate(captor.capture());
|
||||
Assertions.assertEquals(tmpRemoteRule, captor.getValue());
|
||||
|
||||
Thread.sleep(3000);
|
||||
await().until(() -> migrationRuleListener.localRuleMigrationFuture.isDone());
|
||||
Assertions.assertNull(migrationRuleListener.ruleMigrationFuture);
|
||||
// MigrationRule tmpLocalRule = migrationRuleListener.getRule();
|
||||
// ArgumentCaptor<MigrationRule> captorLocalRule = ArgumentCaptor.forClass(MigrationRule.class);
|
||||
// Mockito.verify(handler, Mockito.times(2)).doMigrate(captorLocalRule.capture());
|
||||
// Assertions.assertEquals(tmpLocalRule, captorLocalRule.getValue());
|
||||
Assertions.assertEquals(tmpRemoteRule, migrationRuleListener.getRule());
|
||||
Mockito.verify(handler, Mockito.times(1)).doMigrate(Mockito.any());
|
||||
|
||||
ArgumentCaptor<MigrationRule> captor2 = ArgumentCaptor.forClass(MigrationRule.class);
|
||||
migrationRuleListener.getHandlers().put(migrationInvoker2, handler2);
|
||||
@ -199,14 +218,18 @@ class MigrationRuleListenerTest {
|
||||
|
||||
|
||||
migrationRuleListener.process(new ConfigChangedEvent("key", "group", dynamicRemoteRule));
|
||||
Thread.sleep(1000);
|
||||
|
||||
await().until(migrationRuleListener.ruleQueue::isEmpty);
|
||||
await().untilAsserted(() -> {
|
||||
Mockito.verify(handler, Mockito.times(2)).doMigrate(Mockito.any());
|
||||
Mockito.verify(handler2, Mockito.times(2)).doMigrate(Mockito.any());
|
||||
});
|
||||
|
||||
Assertions.assertNotNull(migrationRuleListener.ruleMigrationFuture);
|
||||
ArgumentCaptor<MigrationRule> captor_event = ArgumentCaptor.forClass(MigrationRule.class);
|
||||
Mockito.verify(handler, Mockito.times(2)).doMigrate(captor_event.capture());
|
||||
Assertions.assertEquals("APPLICATION_FIRST", captor_event.getValue().getStep().toString());
|
||||
Mockito.verify(handler2, Mockito.times(2)).doMigrate(captor_event.capture());
|
||||
Assertions.assertEquals("APPLICATION_FIRST", captor_event.getValue().getStep().toString());
|
||||
|
||||
ApplicationModel.reset();
|
||||
}
|
||||
}
|
||||
|
7
pom.xml
7
pom.xml
@ -89,6 +89,7 @@
|
||||
<properties>
|
||||
<!-- Test libs -->
|
||||
<junit_jupiter_version>5.8.1</junit_jupiter_version>
|
||||
<awaitility_version>4.2.0</awaitility_version>
|
||||
<hazelcast_version>3.11.1</hazelcast_version>
|
||||
<hamcrest_version>2.2</hamcrest_version>
|
||||
<hibernate_validator_version>5.2.4.Final</hibernate_validator_version>
|
||||
@ -194,6 +195,12 @@
|
||||
<version>${junit_jupiter_version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<version>${awaitility_version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest</artifactId>
|
||||
|
Loading…
Reference in New Issue
Block a user