[improve] update thread pool nums policy (#2606)

Signed-off-by: tomsun28 <tomsun28@outlook.com>
Co-authored-by: shown <yuluo08290126@gmail.com>
Co-authored-by: Calvin <naruse_shinji@163.com>
This commit is contained in:
tomsun28 2024-08-31 16:07:36 +08:00 committed by GitHub
parent 2763ee4dc4
commit 049c0937bc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 41 additions and 32 deletions

View File

@ -18,8 +18,8 @@
package org.apache.hertzbeat.alert;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -50,11 +50,11 @@ public class AlerterWorkerPool {
.setDaemon(true)
.setNameFormat("alerter-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(6,
workerExecutor = new ThreadPoolExecutor(10,
10,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
@ -69,10 +69,10 @@ public class AlerterWorkerPool {
.setNameFormat("notify-worker-%d")
.build();
notifyExecutor = new ThreadPoolExecutor(6,
10,
6,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
new LinkedBlockingQueue<>(),
threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}

View File

@ -69,6 +69,8 @@ import org.springframework.util.CollectionUtils;
public class CalculateAlarm {
private static final String SYSTEM_VALUE_ROW_COUNT = "system_value_row_count";
private static final int CALCULATE_THREADS = 3;
/**
* The alarm in the process is triggered
@ -129,9 +131,9 @@ public class CalculateAlarm {
}
}
};
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
workerPool.executeJob(runnable);
for (int i = 0; i < CALCULATE_THREADS; i++) {
workerPool.executeJob(runnable);
}
}
private void calculate(CollectRep.MetricsData metricsData) {

View File

@ -50,8 +50,10 @@ public class WorkerPool implements DisposableBean {
.setDaemon(true)
.setNameFormat("collect-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(100,
1024,
int coreSize = Math.max(2, Runtime.getRuntime().availableProcessors());
int maxSize = Runtime.getRuntime().availableProcessors() * 16;
workerExecutor = new ThreadPoolExecutor(coreSize,
maxSize,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),

View File

@ -55,7 +55,7 @@ public interface CommonConstants {
/**
* Response status code: Incorrect login account password
*/
byte MONITOR_LOGIN_FAILED_CODE = 0x05;
byte LOGIN_FAILED_CODE = 0x05;
/**
* Monitoring status 0: Paused, 1: Up, 2: Down

View File

@ -49,7 +49,7 @@ public class CommonThreadPool implements DisposableBean {
.setDaemon(true)
.setNameFormat("common-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(2,
workerExecutor = new ThreadPoolExecutor(1,
Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,

View File

@ -17,15 +17,6 @@
package org.apache.hertzbeat.common.support;
import java.lang.reflect.Field;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -33,6 +24,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import java.lang.reflect.Field;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* test for {@link CommonThreadPool}
@ -101,7 +99,7 @@ class CommonThreadPoolTest {
ThreadPoolExecutor workerExecutor = (ThreadPoolExecutor) workerExecutorField.get(pool);
assertNotNull(workerExecutor);
assertEquals(2, workerExecutor.getCorePoolSize());
assertEquals(1, workerExecutor.getCorePoolSize());
assertEquals(Integer.MAX_VALUE, workerExecutor.getMaximumPoolSize());
assertEquals(10, workerExecutor.getKeepAliveTime(TimeUnit.SECONDS));
assertTrue(workerExecutor.getQueue() instanceof SynchronousQueue);

View File

@ -17,8 +17,9 @@
package org.apache.hertzbeat.manager.controller;
import static org.apache.hertzbeat.common.constants.CommonConstants.MONITOR_LOGIN_FAILED_CODE;
import static org.apache.hertzbeat.common.constants.CommonConstants.LOGIN_FAILED_CODE;
import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
import io.jsonwebtoken.ExpiredJwtException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
@ -58,7 +59,7 @@ public class AccountController {
try {
return ResponseEntity.ok(Message.success(accountService.authGetToken(loginDto)));
} catch (AuthenticationException e) {
return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, e.getMessage()));
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, e.getMessage()));
}
}
@ -70,10 +71,13 @@ public class AccountController {
try {
return ResponseEntity.ok(Message.success(accountService.refreshToken(refreshToken)));
} catch (AuthenticationException e) {
return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, e.getMessage()));
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, e.getMessage()));
} catch (ExpiredJwtException expiredJwtException) {
log.warn("{}", expiredJwtException.getMessage());
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh Token Expired"));
} catch (Exception e) {
log.error("Exception occurred during token refresh: {}", e.getClass().getName(), e);
return ResponseEntity.ok(Message.fail(MONITOR_LOGIN_FAILED_CODE, "Refresh Token Expired or Error"));
return ResponseEntity.ok(Message.fail(LOGIN_FAILED_CODE, "Refresh Token Error"));
}
}
}

View File

@ -85,7 +85,7 @@ class AccountControllerTest {
this.mockMvc.perform(MockMvcRequestBuilders.post("/api/account/auth/form")
.contentType(MediaType.APPLICATION_JSON)
.content(JsonUtil.toJson(loginDto)))
.andExpect(jsonPath("$.code").value((int) CommonConstants.MONITOR_LOGIN_FAILED_CODE))
.andExpect(jsonPath("$.code").value((int) CommonConstants.LOGIN_FAILED_CODE))
.andReturn();
}
@ -95,7 +95,7 @@ class AccountControllerTest {
Mockito.when(accountService.refreshToken(refreshToken)).thenThrow(new AuthenticationException());
this.mockMvc.perform(MockMvcRequestBuilders.get("/api/account/auth/refresh/{refreshToken}",
refreshToken))
.andExpect(jsonPath("$.code").value((int) CommonConstants.MONITOR_LOGIN_FAILED_CODE))
.andExpect(jsonPath("$.code").value((int) CommonConstants.LOGIN_FAILED_CODE))
.andReturn();
}
}

View File

@ -50,6 +50,8 @@ import org.apache.hertzbeat.remoting.event.NettyEventListener;
@Slf4j
public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
private static final int DEFAULT_WORKER_THREAD_NUM = Math.min(4, Runtime.getRuntime().availableProcessors());
private final NettyClientConfig nettyClientConfig;
private final CommonThreadPool threadPool;
@ -79,7 +81,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
.setDaemon(true)
.setNameFormat("netty-client-worker-%d")
.build();
this.workerGroup = new NioEventLoopGroup(threadFactory);
String envThreadNum = System.getProperty("hertzbeat.client.worker.thread.num");
int workerThreadNum = envThreadNum != null ? Integer.parseInt(envThreadNum) : DEFAULT_WORKER_THREAD_NUM;
this.workerGroup = new NioEventLoopGroup(workerThreadNum, threadFactory);
this.bootstrap.group(workerGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.nettyClientConfig.getConnectTimeoutMillis())
.channel(NioSocketChannel.class)

View File

@ -77,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override
public void start() {
this.threadPool.execute(() -> {
int port = this.nettyServerConfig.getPort();
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder()

View File

@ -48,8 +48,8 @@ public class WarehouseWorkerPool {
.setDaemon(true)
.setNameFormat("warehouse-worker-%d")
.build();
workerExecutor = new ThreadPoolExecutor(6,
10,
workerExecutor = new ThreadPoolExecutor(2,
Integer.MAX_VALUE,
10,
TimeUnit.SECONDS,
new SynchronousQueue<>(),