设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 数据 创业者 手机
当前位置: 首页 > 综合聚焦 > 编程要点 > 语言 > 正文

Java从零开展手写RPC—Reflect反射实现通用调用之服务端

发布时间:2021-11-04 09:48 所属栏目:51 来源:互联网
导读:前面我们的例子是一个固定的出参和入参,固定的方法实现。 本节将实现通用的调用,让框架具有更广泛的实用性。 基本思路 所有的方法调用,基于反射进行相关处理实现。 java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端 服务端 核心类 RpcServer
 
 
前面我们的例子是一个固定的出参和入参,固定的方法实现。
 
本节将实现通用的调用,让框架具有更广泛的实用性。
 
基本思路
所有的方法调用,基于反射进行相关处理实现。
 
java 从零开始手写 RPC (05) reflect 反射实现通用调用之服务端
服务端
核心类
RpcServer
调整如下:
 
serverBootstrap.group(workerGroup, bossGroup) 
    .channel(NioServerSocketChannel.class) 
    // 打印日志 
    .handler(new LoggingHandler(LogLevel.INFO)) 
    .childHandler(new ChannelInitializer<Channel>() { 
        @Override 
        protected void initChannel(Channel ch) throws Exception { 
            ch.pipeline() 
            // 解码 bytes=>resp 
            .addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))) 
             // request=>bytes 
             .addLast(new ObjectEncoder()) 
             .addLast(new RpcServerHandler()); 
        } 
    }) 
    // 这个参数影响的是还没有被accept 取出的连接 
    .option(ChannelOption.SO_BACKLOG, 128) 
    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 
    .childOption(ChannelOption.SO_KEEPALIVE, true); 
其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。
 
RpcServerHandler
package com.github.houbb.rpc.server.handler; 
 
 
import com.github.houbb.log.integration.core.Log; 
import com.github.houbb.log.integration.core.LogFactory; 
import com.github.houbb.rpc.common.rpc.domain.RpcRequest; 
import com.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse; 
import com.github.houbb.rpc.server.service.impl.DefaultServiceFactory; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
 
 
/** 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public class RpcServerHandler extends SimpleChannelInboundHandler { 
 
 
    private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
 
 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        final String id = ctx.channel().id().asLongText(); 
        log.info("[Server] channel {} connected " + id); 
    } 
 
 
    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
        final String id = ctx.channel().id().asLongText(); 
        log.info("[Server] channel read start: {}", id); 
 
 
        // 接受客户端请求 
        RpcRequest rpcRequest = (RpcRequest)msg; 
        log.info("[Server] receive channel {} request: {}", id, rpcRequest); 
 
 
        // 回写到 client 端 
        DefaultRpcResponse rpcResponse = handleRpcRequest(rpcRequest); 
        ctx.writeAndFlush(rpcResponse); 
        log.info("[Server] channel {} response {}", id, rpcResponse); 
    } 
 
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
        cause.printStackTrace(); 
        ctx.close(); 
    } 
 
 
    /** 
     * 处理请求信息 
     * @param rpcRequest 请求信息 
     * @return 结果信息 
     * @since 0.0.6 
     */ 
    private DefaultRpcResponse handleRpcRequest(final RpcRequest rpcRequest) { 
        DefaultRpcResponse rpcResponse = new DefaultRpcResponse(); 
        rpcResponse.seqId(rpcRequest.seqId()); 
 
 
        try { 
            // 获取对应的 service 实现类 
            // rpcRequest=>invocationRequest 
            // 执行 invoke 
            Object result = DefaultServiceFactory.getInstance() 
                    .invoke(rpcRequest.serviceId(), 
                            rpcRequest.methodName(), 
                            rpcRequest.paramTypeNames(), 
                            rpcRequest.paramValues()); 
            rpcResponse.result(result); 
        } catch (Exception e) { 
            rpcResponse.error(e); 
            log.error("[Server] execute meet ex for request", rpcRequest, e); 
        } 
 
 
        // 构建结果值 
        return rpcResponse; 
    } 
 
 
和以前类似,不过 handleRpcRequest 要稍微麻烦一点。
 
这里需要根据发射,调用对应的方法。
 
pojo
其中使用的出参、入参实现如下:
 
RpcRequest
package com.github.houbb.rpc.common.rpc.domain; 
 
 
import java.util.List; 
 
 
/** 
 * 序列化相关处理 
 * (1)调用创建时间-createTime 
 * (2)调用方式 callType 
 * (3)超时时间 timeOut 
 * 
 * 额外信息: 
 * (1)上下文信息 
 * 
 * @author binbin.hou 
 * @since 0.0.6 
 */ 
public interface RpcRequest extends BaseRpc { 
 
 
    /** 
     * 创建时间 
     * @return 创建时间 
     * @since 0.0.6 
     */ 
    long createTime(); 
 
 
    /** 
     * 服务唯一标识 
     * @return 服务唯一标识 
     * @since 0.0.6 
     */ 
    String serviceId(); 
 
 
    /** 
     * 方法名称 
     * @return 方法名称 
     * @since 0.0.6 
     */ 
    String methodName(); 
 
 
    /** 
     * 方法类型名称列表 
     * @return 名称列表 
     * @since 0.0.6 
     */ 
    List<String> paramTypeNames(); 
 
 
    // 调用参数信息列表 
 
 
    /** 
     * 调用参数值 
     * @return 参数值数组 
     * @since 0.0.6 
     */ 
    Object[] paramValues(); 
 
 
RpcResponse
package com.github.houbb.rpc.common.rpc.domain; 
 
 
/** 
 * 序列化相关处理 
 * @author binbin.hou 
 * @since 0.0.6 
 */ 
public interface RpcResponse extends BaseRpc { 
 
 
    /** 
     * 异常信息 
     * @return 异常信息 
     * @since 0.0.6 
     */ 
    Throwable error(); 
 
 
    /** 
     * 请求结果 
     * @return 请求结果 
     * @since 0.0.6 
     */ 
    Object result(); 
 
 
BaseRpc
package com.github.houbb.rpc.common.rpc.domain; 
 
 
import java.io.Serializable; 
 
 
/** 
 * 序列化相关处理 
 * @author binbin.hou 
 * @since 0.0.6 
 */ 
public interface BaseRpc extends Serializable { 
 
 
    /** 
     * 获取唯一标识号 
     * (1)用来唯一标识一次调用,便于获取该调用对应的响应信息。 
     * @return 唯一标识号 
     */ 
    String seqId(); 
 
 
    /** 
     * 设置唯一标识号 
     * @param traceId 唯一标识号 
     * @return this 
     */ 
    BaseRpc seqId(final String traceId); 
 
 
ServiceFactory-服务工厂
为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。
 
ServiceFactory
package com.github.houbb.rpc.server.service; 
 
 
import com.github.houbb.rpc.server.config.service.ServiceConfig; 
import com.github.houbb.rpc.server.registry.ServiceRegistry; 
 
 
import java.util.List; 
 
 
/** 
 * 服务方法类仓库管理类-接口 
 * 
 * 
 * (1)对外暴露的方法,应该尽可能的少。 
 * (2)对于外部的调用,后期比如 telnet 治理,可以使用比如有哪些服务列表? 
 * 单个服务有哪些方法名称? 
 * 
 * 等等基础信息的查询,本期暂时全部隐藏掉。 
 * 
 * (3)前期尽可能的少暴露方法。 
 * @author binbin.hou 
 * @since 0.0.6 
 * @see ServiceRegistry 服务注册,将服务信息放在这个类中,进行统一的管理。 
 * @see ServiceMethod 方法信息 
 */ 
public interface ServiceFactory { 
 
 
    /** 
     * 注册服务列表信息 
     * @param serviceConfigList 服务配置列表 
     * @return this 
     * @since 0.0.6 
     */ 
    ServiceFactory registerServices(final List<ServiceConfig> serviceConfigList); 
 
 
    /** 
     * 直接反射调用 
     * (1)此处对于方法反射,为了提升性能,所有的 class.getFullName() 进行拼接然后放进 key 中。 
     * 
     * @param serviceId 服务名称 
     * @param methodName 方法名称 
     * @param paramTypeNames 参数类型名称列表 
     * @param paramValues 参数值 
     * @return 方法调用返回值 
     * @since 0.0.6 
     */ 
    Object invoke(final String serviceId, final String methodName, 
                  List<String> paramTypeNames, final Object[] paramValues); 
 
 
DefaultServiceFactory
作为默认实现,如下:
 
package com.github.houbb.rpc.server.service.impl; 
 
 
import com.github.houbb.heaven.constant.PunctuationConst; 
import com.github.houbb.heaven.util.common.ArgUtil; 
import com.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil; 
import com.github.houbb.heaven.util.util.CollectionUtil; 
import com.github.houbb.rpc.common.exception.RpcRuntimeException; 
import com.github.houbb.rpc.server.config.service.ServiceConfig; 
import com.github.houbb.rpc.server.service.ServiceFactory; 
 
 
import java.lang.reflect.InvocationTargetException; 
import java.lang.reflect.Method; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
 
 
/** 
 * 默认服务仓库实现 
 * @author binbin.hou 
 * @since 0.0.6 
 */ 
public class DefaultServiceFactory implements ServiceFactory { 
 
 
    /** 
     * 服务 map 
     * @since 0.0.6 
     */ 
    private Map<String, Object> serviceMap; 
 
 
    /** 
     * 直接获取对应的 method 信息 
     * (1)key: serviceId:methodName:param1@param2@param3 
     * (2)value: 对应的 method 信息 
     */ 
    private Map<String, Method> methodMap; 
 
 
    private static final DefaultServiceFactory INSTANCE = new DefaultServiceFactory(); 
 
 
    private DefaultServiceFactory(){} 
 
 
    public static DefaultServiceFactory getInstance() { 
        return INSTANCE; 
    } 
 
 
    /** 
     * 服务注册一般在项目启动的时候,进行处理。 
     * 属于比较重的操作,而且一个服务按理说只应该初始化一次。 
     * 此处加锁为了保证线程安全。 
     * @param serviceConfigList 服务配置列表 
     * @return this 
     */ 
    @Override 
    public synchronized ServiceFactory registerServices(List<ServiceConfig> serviceConfigList) { 
        ArgUtil.notEmpty(serviceConfigList, "serviceConfigList"); 
 
 
        // 集合初始化 
        serviceMap = new HashMap<>(serviceConfigList.size()); 
        // 这里只是预估,一般为2个服务。 
        methodMap = new HashMap<>(serviceConfigList.size()*2); 
 
 
        for(ServiceConfig serviceConfig : serviceConfigList) { 
            serviceMap.put(serviceConfig.id(), serviceConfig.reference()); 
        } 
 
 
        // 存放方法名称 
        for(Map.Entry<String, Object> entry : serviceMap.entrySet()) { 
            String serviceId = entry.getKey(); 
            Object reference = entry.getValue(); 
 
 
            //获取所有方法列表 
            Method[] methods = reference.getClass().getMethods(); 
            for(Method method : methods) { 
                String methodName = method.getName(); 
                if(ReflectMethodUtil.isIgnoreMethod(methodName)) { 
                    continue; 
                } 
 
 
                List<String> paramTypeNames = ReflectMethodUtil.getParamTypeNames(method); 
                String key = buildMethodKey(serviceId, methodName, paramTypeNames); 
                methodMap.put(key, method); 
            } 
        } 
 
 
        return this; 
    } 
 
 
 
 
    @Override 
    public Object invoke(String serviceId, String methodName, List<String> paramTypeNames, Object[] paramValues) { 
        //参数校验 
        ArgUtil.notEmpty(serviceId, "serviceId"); 
        ArgUtil.notEmpty(methodName, "methodName"); 
 
 
        // 提供 cache,可以根据前三个值快速定位对应的 method 
        // 根据 method 进行反射处理。 
        // 对于 paramTypes 进行 string 连接处理。 
        final Object reference = serviceMap.get(serviceId); 
        final String methodKey = buildMethodKey(serviceId, methodName, paramTypeNames); 
        final Method method = methodMap.get(methodKey); 
 
 
        try { 
            return method.invoke(reference, paramValues); 
        } catch (IllegalAccessException | InvocationTargetException e) { 
            throw new RpcRuntimeException(e); 
        } 
    } 
 
 
    /** 
     * (1)多个之间才用 : 分隔 
     * (2)参数之间采用 @ 分隔 
     * @param serviceId 服务标识 
     * @param methodName 方法名称 
     * @param paramTypeNames 参数类型名称 
     * @return 构建完整的 key 
     * @since 0.0.6 
     */ 
    private String buildMethodKey(String serviceId, String methodName, List<String> paramTypeNames) { 
        String param = CollectionUtil.join(paramTypeNames, PunctuationConst.AT); 
        return serviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON 
                +param; 
    } 
 
 
ServiceRegistry-服务注册类
接口
package com.github.houbb.rpc.server.registry; 
 
 
/** 
 * 服务注册类 
 * (1)每个应用唯一 
 * (2)每个服务的暴露协议应该保持一致 
 * 暂时不提供单个服务的特殊处理,后期可以考虑添加 
 * 
 * @author binbin.hou 
 * @since 0.0.6 
 */ 
public interface ServiceRegistry { 
 
 
    /** 
     * 暴露的 rpc 服务端口信息 
     * @param port 端口信息 
     * @return this 
     * @since 0.0.6 
     */ 
    ServiceRegistry port(final int port); 
 
 
    /** 
     * 注册服务实现 
     * @param serviceId 服务标识 
     * @param serviceImpl 服务实现 
     * @return this 
     * @since 0.0.6 
     */ 
    ServiceRegistry register(final String serviceId, final Object serviceImpl); 
 
 
    /** 
     * 暴露所有服务信息 
     * (1)启动服务端 
     * @return this 
     * @since 0.0.6 
     */ 
    ServiceRegistry expose(); 
 
 
}

(编辑:ASP站长网)

    网友评论
    推荐文章
      热点阅读