设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 数据 创业者 手机
当前位置: 首页 > 综合聚焦 > 编程要点 > 语言 > 正文

Java从零开启手写RPC-序列化

发布时间:2021-11-04 09:50 所属栏目:51 来源:互联网
导读:前面几节我们实现了最基础的客户端调用服务端,这一节来学习一下通讯中的对象序列化。 为什么需要序列化 netty 底层都是基于 ByteBuf 进行通讯的。 前面我们通过编码器/解码器专门为计算的入参/出参进行处理,这样方便我们直接使用 pojo。 但是有一个问题,
前面几节我们实现了最基础的客户端调用服务端,这一节来学习一下通讯中的对象序列化。
 
 
 
为什么需要序列化
netty 底层都是基于 ByteBuf 进行通讯的。
 
前面我们通过编码器/解码器专门为计算的入参/出参进行处理,这样方便我们直接使用 pojo。
 
但是有一个问题,如果想把我们的项目抽象为框架,那就需要为所有的对象编写编码器/解码器。
 
显然,直接通过每一个对象写一对的方式是不现实的,而且用户如何使用,也是未知的。
 
序列化的方式
基于字节的实现,性能好,可读性不高。
 
基于字符串的实现,比如 json 序列化,可读性好,性能相对较差。
 
ps: 可以根据个人还好选择,相关序列化可参考下文,此处不做展开。
 
json 序列化框架简介[1]
 
实现思路
可以将我们的 Pojo 全部转化为 byte,然后 Byte 转换为 ByteBuf 即可。
 
反之亦然。
 
代码实现
maven
引入序列化包:
 
 
 
<dependency> 
    <groupId>com.github.houbb</groupId> 
    <artifactId>json</artifactId> 
    <version>0.1.1</version> 
</dependency> 
服务端
核心
服务端的代码可以大大简化:
 
serverBootstrap.group(workerGroup, bossGroup) 
    .channel(NioServerSocketChannel.class) 
    // 打印日志 
    .handler(new LoggingHandler(LogLevel.INFO)) 
    .childHandler(new ChannelInitializer<Channel>() { 
        @Override 
        protected void initChannel(Channel ch) throws Exception { 
            ch.pipeline() 
                    .addLast(new RpcServerHandler()); 
        } 
    }) 
    // 这个参数影响的是还没有被accept 取出的连接 
    .option(ChannelOption.SO_BACKLOG, 128) 
    // 这个参数只是过一段时间内客户端没有响应,服务端会发送一个 ack 包,以判断客户端是否还活着。 
    .childOption(ChannelOption.SO_KEEPALIVE, true); 
这里只需要一个实现类即可。
 
RpcServerHandler
服务端的序列化/反序列化调整为直接使用 JsonBs 实现。
 
package com.github.houbb.rpc.server.handler; 
 
 
import com.github.houbb.json.bs.JsonBs; 
import com.github.houbb.log.integration.core.Log; 
import com.github.houbb.log.integration.core.LogFactory; 
import com.github.houbb.rpc.common.model.CalculateRequest; 
import com.github.houbb.rpc.common.model.CalculateResponse; 
import com.github.houbb.rpc.common.service.Calculator; 
import com.github.houbb.rpc.server.service.CalculatorService; 
 
 
import io.netty.buffer.ByteBuf; 
import io.netty.buffer.Unpooled; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
 
 
/** 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public class RpcServerHandler extends SimpleChannelInboundHandler { 
 
 
    private static final Log log = LogFactory.getLog(RpcServerHandler.class); 
 
 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        final String id = ctx.channel().id().asLongText(); 
        log.info("[Server] channel {} connected " + id); 
    } 
 
 
    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 
        final String id = ctx.channel().id().asLongText(); 
 
 
        ByteBuf byteBuf = (ByteBuf)msg; 
        byte[] bytes = new byte[byteBuf.readableBytes()]; 
        byteBuf.readBytes(bytes); 
        CalculateRequest request = JsonBs.deserializeBytes(bytes, CalculateRequest.class); 
        log.info("[Server] receive channel {} request: {} from ", id, request); 
 
 
        Calculator calculator = new CalculatorService(); 
        CalculateResponse response = calculator.sum(request); 
 
 
        // 回写到 client 端 
        byte[] responseBytes = JsonBs.serializeBytes(response); 
        ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes); 
        ctx.writeAndFlush(responseBuffer); 
        log.info("[Server] channel {} response {}", id, response); 
    } 
 
 
客户端
核心
客户端可以简化如下:
 
channelFuture = bootstrap.group(workerGroup) 
    .channel(NioSocketChannel.class) 
    .option(ChannelOption.SO_KEEPALIVE, true) 
    .handler(new ChannelInitializer<Channel>(){ 
        @Override 
        protected void initChannel(Channel ch) throws Exception { 
            channelHandler = new RpcClientHandler(); 
            ch.pipeline() 
                    .addLast(new LoggingHandler(LogLevel.INFO)) 
                    .addLast(channelHandler); 
        } 
    }) 
    .connect(RpcConstant.ADDRESS, port) 
    .syncUninterruptibly(); 

(编辑:ASP站长网)

    网友评论
    推荐文章
      热点阅读