1
0
mirror of https://github.com/apache/dubbo.git synced 2024-10-23 07:04:37 +08:00

Fix countdown assignment problem (#11275)

This commit is contained in:
huazhongming 2023-01-12 17:32:55 +08:00 committed by GitHub
parent fe50ac5d79
commit 7a5680b559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 31 additions and 82 deletions

View File

@ -41,7 +41,6 @@ import java.util.Set;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
/**
@ -106,7 +105,7 @@ public class ConsumerContextFilter implements ClusterFilter, ClusterFilter.Liste
// the subsequent calls launched by the Server side will be enabled by default,
// and support to turn off the function on a node to get rid of the timeout control.
if (invoker.getUrl().getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, true)) {
context.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, countDown);
context.setObjectAttachment(TIME_COUNTDOWN_KEY, countDown);
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
if (timeoutCountDown.isExpired()) {

View File

@ -25,19 +25,23 @@ import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.InvokeMode;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.service.GenericService;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE_ASYNC;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GENERIC_PARAMETER_DESC;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY_LOWER;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_REFLECTIVE_OPERATION_FAILED;
import static org.apache.dubbo.rpc.Constants.$ECHO;
import static org.apache.dubbo.rpc.Constants.$ECHO_PARAMETER_DESC;
@ -276,6 +280,28 @@ public class RpcUtils {
return timeout;
}
public static int calculateTimeout(URL url, Invocation invocation, String methodName, long defaultTimeout) {
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
int timeout = (int) defaultTimeout;
if (countdown == null) {
if (url != null) {
timeout = (int) RpcUtils.getTimeout(url, methodName, RpcContext.getClientAttachment(), invocation, defaultTimeout);
if (url.getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
// pass timeout to remote server
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);
}
}
} else {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
// pass timeout to remote server
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);
}
invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
return timeout;
}
private static long convertToNumber(Object obj, long defaultTimeout) {
long timeout = defaultTimeout;
try {

View File

@ -29,28 +29,22 @@ import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_VERSION;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_CLIENT;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
@ -101,7 +95,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, methodName, DEFAULT_TIMEOUT);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
@ -173,22 +167,4 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
}
}
}
private int calculateTimeout(Invocation invocation, String methodName) {
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
int timeout;
if (countdown == null) {
timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getClientAttachment(), invocation, DEFAULT_TIMEOUT);
if (getUrl().getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
}
} else {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
}
invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
return timeout;
}
}

View File

@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceModel;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
@ -46,14 +45,10 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.LOCALHOST_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
/**
@ -106,7 +101,7 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
}
int timeout = calculateTimeout(invocation, invocation.getMethodName());
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, invocation.getMethodName(), DEFAULT_TIMEOUT);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
@ -278,23 +273,4 @@ public class InjvmInvoker<T> extends AbstractInvoker<T> {
}
return remoteUrl.getParameter(ASYNC_KEY, false);
}
private int calculateTimeout(Invocation invocation, String methodName) {
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
int timeout;
if (countdown == null) {
timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getClientAttachment(), invocation, DEFAULT_TIMEOUT);
if (getUrl().getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout); // pass timeout to remote server
}
} else {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY, timeout);// pass timeout to remote server
}
invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
return timeout;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.dubbo.rpc.protocol.tri;
import io.netty.util.AsciiString;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
@ -33,7 +34,6 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.TriRpcStatus;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
@ -49,22 +49,16 @@ import org.apache.dubbo.rpc.protocol.tri.compressor.Compressor;
import org.apache.dubbo.rpc.protocol.tri.observer.ClientCallToObserverAdapter;
import org.apache.dubbo.rpc.support.RpcUtils;
import io.netty.util.AsciiString;
import java.util.Arrays;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_ATTACHMENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_DESTROY_INVOKER;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_FAILED_REQUEST;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
@ -195,7 +189,7 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
ClientCall call) {
ExecutorService callbackExecutor = getCallbackExecutor(getUrl(), invocation);
int timeout = calculateTimeout(invocation, invocation.getMethodName());
int timeout = RpcUtils.calculateTimeout(getUrl(), invocation, invocation.getMethodName(), 3000);
if (timeout <= 0) {
return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
"No time left for making the following call: " + invocation.getServiceName() + "."
@ -299,26 +293,4 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
}
}
}
private int calculateTimeout(Invocation invocation, String methodName) {
Object countdown = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
int timeout;
if (countdown == null) {
timeout = (int) RpcUtils.getTimeout(getUrl(), methodName,
RpcContext.getClientAttachment(), invocation, 3000);
if (getUrl().getMethodParameter(methodName, ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
timeout); // pass timeout to remote server
}
} else {
TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
invocation.setObjectAttachment(TIMEOUT_ATTACHMENT_KEY,
timeout);// pass timeout to remote server
}
invocation.getObjectAttachments().remove(TIME_COUNTDOWN_KEY);
return timeout;
}
}