1
0
mirror of https://github.com/apache/dubbo.git synced 2024-10-23 07:04:37 +08:00

issue #11458:Triple stub support async mode (#11464)

* issue #11458:Triple stub support async mode

* issue 11458:Triple stub support async mode

* issue 11458:Triple stub support async mode
This commit is contained in:
一个不知名的Java靓仔 2023-02-06 17:28:49 +08:00 committed by GitHub
parent 34b92b0874
commit c0b257adac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 0 deletions

View File

@ -72,6 +72,17 @@ public final class {{className}} {
{{inputType}}.class, {{outputType}}.class, serviceDescriptor, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
private static final StubMethodDescriptor {{methodName}}AsyncMethod = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, java.util.concurrent.CompletableFuture.class, serviceDescriptor, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod = new StubMethodDescriptor("{{originMethodName}}Async",
{{inputType}}.class, {{outputType}}.class, serviceDescriptor, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
{{/unaryMethods}}
{{#serverStreamingMethods}}
@ -121,6 +132,10 @@ public final class {{className}} {
return StubInvocationUtil.unaryCall(invoker, {{methodName}}Method, request);
}
public CompletableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request){
return StubInvocationUtil.unaryCall(invoker, {{methodName}}AsyncMethod, request);
}
{{#javaDoc}}
{{{javaDoc}}}
{{/javaDoc}}
@ -163,6 +178,21 @@ public final class {{className}} {
public static abstract class {{interfaceClassName}}ImplBase implements {{interfaceClassName}}, ServerService<{{interfaceClassName}}> {
private <T, R> BiConsumer<T, StreamObserver<R>> syncToAsync(java.util.function.Function<T, R> syncFun) {
return new BiConsumer<T, StreamObserver<R>>() {
@Override
public void accept(T t, StreamObserver<R> observer) {
try {
R ret = syncFun.apply(t);
observer.onNext(ret);
observer.onCompleted();
} catch (Throwable e) {
observer.onError(e);
}
}
};
}
@Override
public final Invoker<{{interfaceClassName}}> getInvoker(URL url) {
PathResolver pathResolver = url.getOrDefaultFrameworkModel()
@ -172,11 +202,14 @@ public final class {{className}} {
{{#methods}}
pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}" );
pathResolver.addNativeStub( "/" + SERVICE_NAME + "/{{originMethodName}}Async" );
{{/methods}}
{{#unaryMethods}}
BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>> {{methodName}}Func = this::{{methodName}};
handlers.put({{methodName}}Method.getMethodName(), new UnaryStubMethodHandler<>({{methodName}}Func));
BiConsumer<{{inputType}}, StreamObserver<{{outputType}}>> {{methodName}}AsyncFunc = syncToAsync(this::{{methodName}});
handlers.put({{methodName}}ProxyAsyncMethod.getMethodName(), new UnaryStubMethodHandler<>({{methodName}}AsyncFunc));
{{/unaryMethods}}
{{#serverStreamingMethods}}

View File

@ -58,6 +58,8 @@ public class StubInvocationUtil {
methodDescriptor.getMethodName(), invoker.getInterface().getName(),
invoker.getUrl().getProtocolServiceKey(), methodDescriptor.getParameterClasses(),
arguments);
//When there are multiple MethodDescriptors with the same method name, the return type will be wrong
rpcInvocation.setReturnType(methodDescriptor.getReturnClass());
try {
return InvocationUtil.invoke(invoker, rpcInvocation);
} catch (Throwable e) {