Java 从零开始手写 RPC—Reflect 反射实现通用调用之服务端

时间:2022-09-11 08:30:36

Java 从零开始手写 RPC—Reflect 反射实现通用调用之服务端

前面我们的例子是一个固定的出参和入参,固定的方法实现。

本节将实现通用的调用,让框架具有更广泛的实用性。

基本思路

所有的方法调用,基于反射进行相关处理实现。

Java 从零开始手写 RPC—Reflect 反射实现通用调用之服务端

服务端

核心类

  • RpcServer

调整如下:

  1. serverBootstrap.group(workerGroup,bossGroup)
  2. .channel(NioServerSocketChannel.class)
  3. //打印日志
  4. .handler(newLoggingHandler(LogLevel.INFO))
  5. .childHandler(newChannelInitializer(){
  6. @Override
  7. protectedvoidinitChannel(Channelch)throwsException{
  8. ch.pipeline()
  9. //解码bytes=>resp
  10. .addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)))
  11. //request=>bytes
  12. .addLast(newObjectEncoder())
  13. .addLast(newRpcServerHandler());
  14. }
  15. })
  16. //这个参数影响的是还没有被accept取出的连接
  17. .option(ChannelOption.SO_BACKLOG,128)
  18. //这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着。
  19. .childOption(ChannelOption.SO_KEEPALIVE,true);

其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。

RpcServerHandler

  1. packagecom.github.houbb.rpc.server.handler;
  2. importcom.github.houbb.log.integration.core.Log;
  3. importcom.github.houbb.log.integration.core.LogFactory;
  4. importcom.github.houbb.rpc.common.rpc.domain.RpcRequest;
  5. importcom.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
  6. importcom.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
  7. importio.netty.channel.ChannelHandlerContext;
  8. importio.netty.channel.SimpleChannelInboundHandler;
  9. /**
  10. *@authorbinbin.hou
  11. *@since0.0.1
  12. */
  13. publicclassRpcServerHandlerextendsSimpleChannelInboundHandler{
  14. privatestaticfinalLoglog=LogFactory.getLog(RpcServerHandler.class);
  15. @Override
  16. publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
  17. finalStringid=ctx.channel().id().asLongText();
  18. log.info("[Server]channel{}connected"+id);
  19. }
  20. @Override
  21. protectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{
  22. finalStringid=ctx.channel().id().asLongText();
  23. log.info("[Server]channelreadstart:{}",id);
  24. //接受客户端请求
  25. RpcRequestrpcRequest=(RpcRequest)msg;
  26. log.info("[Server]receivechannel{}request:{}",id,rpcRequest);
  27. //回写到client端
  28. DefaultRpcResponserpcResponse=handleRpcRequest(rpcRequest);
  29. ctx.writeAndFlush(rpcResponse);
  30. log.info("[Server]channel{}response{}",id,rpcResponse);
  31. }
  32. @Override
  33. publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
  34. cause.printStackTrace();
  35. ctx.close();
  36. }
  37. /**
  38. *处理请求信息
  39. *@paramrpcRequest请求信息
  40. *@return结果信息
  41. *@since0.0.6
  42. */
  43. privateDefaultRpcResponsehandleRpcRequest(finalRpcRequestrpcRequest){
  44. DefaultRpcResponserpcResponse=newDefaultRpcResponse();
  45. rpcResponse.seqId(rpcRequest.seqId());
  46. try{
  47. //获取对应的service实现类
  48. //rpcRequest=>invocationRequest
  49. //执行invoke
  50. Objectresult=DefaultServiceFactory.getInstance()
  51. .invoke(rpcRequest.serviceId(),
  52. rpcRequest.methodName(),
  53. rpcRequest.paramTypeNames(),
  54. rpcRequest.paramValues());
  55. rpcResponse.result(result);
  56. }catch(Exceptione){
  57. rpcResponse.error(e);
  58. log.error("[Server]executemeetexforrequest",rpcRequest,e);
  59. }
  60. //构建结果值
  61. returnrpcResponse;
  62. }
  63. }

和以前类似,不过 handleRpcRequest 要稍微麻烦一点。

这里需要根据发射,调用对应的方法。

pojo

其中使用的出参、入参实现如下:

RpcRequest

  1. packagecom.github.houbb.rpc.common.rpc.domain;
  2. importjava.util.List;
  3. /**
  4. *序列化相关处理
  5. *(1)调用创建时间-createTime
  6. *(2)调用方式callType
  7. *(3)超时时间timeOut
  8. *
  9. *额外信息:
  10. *(1)上下文信息
  11. *
  12. *@authorbinbin.hou
  13. *@since0.0.6
  14. */
  15. publicinterfaceRpcRequestextendsBaseRpc{
  16. /**
  17. *创建时间
  18. *@return创建时间
  19. *@since0.0.6
  20. */
  21. longcreateTime();
  22. /**
  23. *服务唯一标识
  24. *@return服务唯一标识
  25. *@since0.0.6
  26. */
  27. StringserviceId();
  28. /**
  29. *方法名称
  30. *@return方法名称
  31. *@since0.0.6
  32. */
  33. StringmethodName();
  34. /**
  35. *方法类型名称列表
  36. *@return名称列表
  37. *@since0.0.6
  38. */
  39. ListparamTypeNames();
  40. //调用参数信息列表
  41. /**
  42. *调用参数值
  43. *@return参数值数组
  44. *@since0.0.6
  45. */
  46. Object[]paramValues();
  47. }

RpcResponse

  1. packagecom.github.houbb.rpc.common.rpc.domain;
  2. /**
  3. *序列化相关处理
  4. *@authorbinbin.hou
  5. *@since0.0.6
  6. */
  7. publicinterfaceRpcResponseextendsBaseRpc{
  8. /**
  9. *异常信息
  10. *@return异常信息
  11. *@since0.0.6
  12. */
  13. Throwableerror();
  14. /**
  15. *请求结果
  16. *@return请求结果
  17. *@since0.0.6
  18. */
  19. Objectresult();
  20. }

BaseRpc

  1. packagecom.github.houbb.rpc.common.rpc.domain;
  2. importjava.io.Serializable;
  3. /**
  4. *序列化相关处理
  5. *@authorbinbin.hou
  6. *@since0.0.6
  7. */
  8. publicinterfaceBaseRpcextendsSerializable{
  9. /**
  10. *获取唯一标识号
  11. *(1)用来唯一标识一次调用,便于获取该调用对应的响应信息。
  12. *@return唯一标识号
  13. */
  14. StringseqId();
  15. /**
  16. *设置唯一标识号
  17. *@paramtraceId唯一标识号
  18. *@returnthis
  19. */
  20. BaseRpcseqId(finalStringtraceId);
  21. }

ServiceFactory-服务工厂

为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。

ServiceFactory

  1. packagecom.github.houbb.rpc.server.service;
  2. importcom.github.houbb.rpc.server.config.service.ServiceConfig;
  3. importcom.github.houbb.rpc.server.registry.ServiceRegistry;
  4. importjava.util.List;
  5. /**
  6. *服务方法类仓库管理类-接口
  7. *
  8. *
  9. *(1)对外暴露的方法,应该尽可能的少。
  10. *(2)对于外部的调用,后期比如telnet治理,可以使用比如有哪些服务列表?
  11. *单个服务有哪些方法名称?
  12. *
  13. *等等基础信息的查询,本期暂时全部隐藏掉。
  14. *
  15. *(3)前期尽可能的少暴露方法。
  16. *@authorbinbin.hou
  17. *@since0.0.6
  18. *@seeServiceRegistry服务注册,将服务信息放在这个类中,进行统一的管理。
  19. *@seeServiceMethod方法信息
  20. */
  21. publicinterfaceServiceFactory{
  22. /**
  23. *注册服务列表信息
  24. *@paramserviceConfigList服务配置列表
  25. *@returnthis
  26. *@since0.0.6
  27. */
  28. ServiceFactoryregisterServices(finalListserviceConfigList);
  29. /**
  30. *直接反射调用
  31. *(1)此处对于方法反射,为了提升性能,所有的class.getFullName()进行拼接然后放进key中。
  32. *
  33. *@paramserviceId服务名称
  34. *@parammethodName方法名称
  35. *@paramparamTypeNames参数类型名称列表
  36. *@paramparamValues参数值
  37. *@return方法调用返回值
  38. *@since0.0.6
  39. */
  40. Objectinvoke(finalStringserviceId,finalStringmethodName,
  41. ListparamTypeNames,finalObject[]paramValues);
  42. }

DefaultServiceFactory

作为默认实现,如下:

  1. packagecom.github.houbb.rpc.server.service.impl;
  2. importcom.github.houbb.heaven.constant.PunctuationConst;
  3. importcom.github.houbb.heaven.util.common.ArgUtil;
  4. importcom.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
  5. importcom.github.houbb.heaven.util.util.CollectionUtil;
  6. importcom.github.houbb.rpc.common.exception.RpcRuntimeException;
  7. importcom.github.houbb.rpc.server.config.service.ServiceConfig;
  8. importcom.github.houbb.rpc.server.service.ServiceFactory;
  9. importjava.lang.reflect.InvocationTargetException;
  10. importjava.lang.reflect.Method;
  11. importjava.util.HashMap;
  12. importjava.util.List;
  13. importjava.util.Map;
  14. /**
  15. *默认服务仓库实现
  16. *@authorbinbin.hou
  17. *@since0.0.6
  18. */
  19. publicclassDefaultServiceFactoryimplementsServiceFactory{
  20. /**
  21. *服务map
  22. *@since0.0.6
  23. */
  24. privateMapserviceMap;
  25. /**
  26. *直接获取对应的method信息
  27. *(1)key:serviceId:methodName:param1@param2@param3
  28. *(2)value:对应的method信息
  29. */
  30. privateMapmethodMap;
  31. privatestaticfinalDefaultServiceFactoryINSTANCE=newDefaultServiceFactory();
  32. privateDefaultServiceFactory(){}
  33. publicstaticDefaultServiceFactorygetInstance(){
  34. returnINSTANCE;
  35. }
  36. /**
  37. *服务注册一般在项目启动的时候,进行处理。
  38. *属于比较重的操作,而且一个服务按理说只应该初始化一次。
  39. *此处加锁为了保证线程安全。
  40. *@paramserviceConfigList服务配置列表
  41. *@returnthis
  42. */
  43. @Override
  44. publicsynchronizedServiceFactoryregisterServices(ListserviceConfigList){
  45. ArgUtil.notEmpty(serviceConfigList,"serviceConfigList");
  46. //集合初始化
  47. serviceMap=newHashMap<>(serviceConfigList.size());
  48. //这里只是预估,一般为2个服务。
  49. methodMap=newHashMap<>(serviceConfigList.size()*2);
  50. for(ServiceConfigserviceConfig:serviceConfigList){
  51. serviceMap.put(serviceConfig.id(),serviceConfig.reference());
  52. }
  53. //存放方法名称
  54. for(Map.Entryentry:serviceMap.entrySet()){
  55. StringserviceId=entry.getKey();
  56. Objectreference=entry.getValue();
  57. //获取所有方法列表
  58. Method[]methods=reference.getClass().getMethods();
  59. for(Methodmethod:methods){
  60. StringmethodName=method.getName();
  61. if(ReflectMethodUtil.isIgnoreMethod(methodName)){
  62. continue;
  63. }
  64. ListparamTypeNames=ReflectMethodUtil.getParamTypeNames(method);
  65. Stringkey=buildMethodKey(serviceId,methodName,paramTypeNames);
  66. methodMap.put(key,method);
  67. }
  68. }
  69. returnthis;
  70. }
  71. @Override
  72. publicObjectinvoke(StringserviceId,StringmethodName,ListparamTypeNames,Object[]paramValues){
  73. //参数校验
  74. ArgUtil.notEmpty(serviceId,"serviceId");
  75. ArgUtil.notEmpty(methodName,"methodName");
  76. //提供cache,可以根据前三个值快速定位对应的method
  77. //根据method进行反射处理。
  78. //对于paramTypes进行string连接处理。
  79. finalObjectreference=serviceMap.get(serviceId);
  80. finalStringmethodKey=buildMethodKey(serviceId,methodName,paramTypeNames);
  81. finalMethodmethod=methodMap.get(methodKey);
  82. try{
  83. returnmethod.invoke(reference,paramValues);
  84. }catch(IllegalAccessException|InvocationTargetExceptione){
  85. thrownewRpcRuntimeException(e);
  86. }
  87. }
  88. /**
  89. *(1)多个之间才用:分隔
  90. *(2)参数之间采用@分隔
  91. *@paramserviceId服务标识
  92. *@parammethodName方法名称
  93. *@paramparamTypeNames参数类型名称
  94. *@return构建完整的key
  95. *@since0.0.6
  96. */
  97. privateStringbuildMethodKey(StringserviceId,StringmethodName,ListparamTypeNames){
  98. Stringparam=CollectionUtil.join(paramTypeNames,PunctuationConst.AT);
  99. returnserviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON
  100. +param;
  101. }
  102. }

ServiceRegistry-服务注册类

接口

  1. packagecom.github.houbb.rpc.server.registry;
  2. /**
  3. *服务注册类
  4. *(1)每个应用唯一
  5. *(2)每个服务的暴露协议应该保持一致
  6. *暂时不提供单个服务的特殊处理,后期可以考虑添加
  7. *
  8. *@authorbinbin.hou
  9. *@since0.0.6
  10. */
  11. publicinterfaceServiceRegistry{
  12. /**
  13. *暴露的rpc服务端口信息
  14. *@paramport端口信息
  15. *@returnthis
  16. *@since0.0.6
  17. */
  18. ServiceRegistryport(finalintport);
  19. /**
  20. *注册服务实现
  21. *@paramserviceId服务标识
  22. *@paramserviceImpl服务实现
  23. *@returnthis
  24. *@since0.0.6
  25. */
  26. ServiceRegistryregister(finalStringserviceId,finalObjectserviceImpl);
  27. /**
  28. *暴露所有服务信息
  29. *(1)启动服务端
  30. *@returnthis
  31. *@since0.0.6
  32. */
  33. ServiceRegistryexpose();
  34. }

实现

  1. packagecom.github.houbb.rpc.server.registry.impl;
  2. importcom.github.houbb.heaven.util.common.ArgUtil;
  3. importcom.github.houbb.rpc.common.config.protocol.ProtocolConfig;
  4. importcom.github.houbb.rpc.server.config.service.DefaultServiceConfig;
  5. importcom.github.houbb.rpc.server.config.service.ServiceConfig;
  6. importcom.github.houbb.rpc.server.core.RpcServer;
  7. importcom.github.houbb.rpc.server.registry.ServiceRegistry;
  8. importcom.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
  9. importjava.util.ArrayList;
  10. importjava.util.List;
  11. /**
  12. *默认服务端注册类
  13. *@authorbinbin.hou
  14. *@since0.0.6
  15. */
  16. publicclassDefaultServiceRegistryimplementsServiceRegistry{
  17. /**
  18. *单例信息
  19. *@since0.0.6
  20. */
  21. privatestaticfinalDefaultServiceRegistryINSTANCE=newDefaultServiceRegistry();
  22. /**
  23. *rpc服务端端口号
  24. *@since0.0.6
  25. */
  26. privateintrpcPort;
  27. /**
  28. *协议配置
  29. *(1)默认只实现tcp
  30. *(2)后期可以拓展实现web-service/http/https等等。
  31. *@since0.0.6
  32. */
  33. privateProtocolConfigprotocolConfig;
  34. /**
  35. *服务配置列表
  36. *@since0.0.6
  37. */
  38. privateListserviceConfigList;
  39. privateDefaultServiceRegistry(){
  40. //初始化默认参数
  41. this.serviceConfigList=newArrayList<>();
  42. this.rpcPort=9527;
  43. }
  44. publicstaticDefaultServiceRegistrygetInstance(){
  45. returnINSTANCE;
  46. }
  47. @Override
  48. publicServiceRegistryport(intport){
  49. ArgUtil.positive(port,"port");
  50. this.rpcPort=port;
  51. returnthis;
  52. }
  53. /**
  54. *注册服务实现
  55. *(1)主要用于后期服务调用
  56. *(2)如何根据id获取实现?非常简单,id是唯一的。
  57. *有就是有,没有就抛出异常,直接返回。
  58. *(3)如果根据{@linkcom.github.houbb.rpc.common.rpc.domain.RpcRequest}获取对应的方法。
  59. *
  60. *3.1根据serviceId获取唯一的实现
  61. *3.2根据{@linkClass#getMethod(String,Class[])}方法名称+参数类型唯一获取方法
  62. *3.3根据{@linkjava.lang.reflect.Method#invoke(Object,Object...)}执行方法
  63. *
  64. *@paramserviceId服务标识
  65. *@paramserviceImpl服务实现
  66. *@returnthis
  67. *@since0.0.6
  68. */
  69. @Override
  70. @SuppressWarnings("unchecked")
  71. publicsynchronizedDefaultServiceRegistryregister(finalStringserviceId,finalObjectserviceImpl){
  72. ArgUtil.notEmpty(serviceId,"serviceId");
  73. ArgUtil.notNull(serviceImpl,"serviceImpl");
  74. //构建对应的其他信息
  75. ServiceConfigserviceConfig=newDefaultServiceConfig();
  76. serviceConfig.id(serviceId).reference(serviceImpl);
  77. serviceConfigList.add(serviceConfig);
  78. returnthis;
  79. }
  80. @Override
  81. publicServiceRegistryexpose(){
  82. //注册所有服务信息
  83. DefaultServiceFactory.getInstance()
  84. .registerServices(serviceConfigList);
  85. //暴露nettyserver信息
  86. newRpcServer(rpcPort).start();
  87. returnthis;
  88. }
  89. }

ServiceConfig 是一些服务的配置信息,接口定义如下:

  1. packagecom.github.houbb.rpc.server.config.service;
  2. /**
  3. *单个服务配置类
  4. *
  5. *简化用户使用:
  6. *在用户使用的时候,这个类应该是不可见的。
  7. *直接提供对应的服务注册类即可。
  8. *
  9. *后续拓展
  10. *(1)版本信息
  11. *(2)服务端超时时间
  12. *
  13. *@authorbinbin.hou
  14. *@since0.0.6
  15. *@param实现类泛型
  16. */
  17. publicinterfaceServiceConfig{
  18. /**
  19. *获取唯一标识
  20. *@return获取唯一标识
  21. *@since0.0.6
  22. */
  23. Stringid();
  24. /**
  25. *设置唯一标识
  26. *@paramid标识信息
  27. *@returnthis
  28. *@since0.0.6
  29. */
  30. ServiceConfigid(Stringid);
  31. /**
  32. *获取引用实体实现
  33. *@return实体实现
  34. *@since0.0.6
  35. */
  36. Treference();
  37. /**
  38. *设置引用实体实现
  39. *@paramreference引用实现
  40. *@returnthis
  41. *@since0.0.6
  42. */
  43. ServiceConfigreference(Treference);
  44. }

测试

maven 引入

引入服务端的对应 maven 包:

  1. com.github.houbb
  2. rpc-server
  3. 0.0.6

服务端启动

  1. //启动服务
  2. DefaultServiceRegistry.getInstance()
  3. .register(ServiceIdConst.CALC,newCalculatorServiceImpl())
  4. .expose();

这里注册了一个计算服务,并且设置对应的实现。

和以前实现类似,此处不再赘述。

启动日志:

  1. [DEBUG][2021-10-0513:39:42.638][main][c.g.h.l.i.c.LogFactory.setImplementation]-Logginginitializedusing'classcom.github.houbb.log.integration.adaptors.stdout.StdOutExImpl'adapter.
  2. [INFO][2021-10-0513:39:42.645][Thread-0][c.g.h.r.s.c.RpcServer.run]-RPC服务开始启动服务端
  3. 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerchannelRegistered
  4. 信息:[id:0xec4dc74f]REGISTERED
  5. 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerbind
  6. 信息:[id:0xec4dc74f]BIND:0.0.0.0/0.0.0.0:9527
  7. 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerchannelActive
  8. 信息:[id:0xec4dc74f,L:/0:0:0:0:0:0:0:0:9527]ACTIVE
  9. [INFO][2021-10-0513:39:43.893][Thread-0][c.g.h.r.s.c.RpcServer.run]-RPC服务端启动完成,监听【9527】端口

ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。

原文链接:https://www.toutiao.com/a7017765348539892256/