netty之分隔符和定长解码器解决之道

特此说明:我参考了李林锋老师写的《netty权威指南》一书,支持大家买正版书学习。学会了,赶紧写下来,不但为了加深记忆也希望对大家有所帮助!

上节我们讲解了LineBasedFrameDecoder和StringDecoder的使用,如果大家理解了这二个东西,那么这一章学起来将是轻车熟路。话不多说开始吧。

本章我们将讲解一下内容:

DelimiterBasedFrameDecoder(可以自动完成以分隔符做结束标志的消息解码)
FixedLengthFrameDecoder(可以自动完成对定长消息的解码)

在本章开始先给大家奉送代码,大家可以对照着学习。

基础知识

TCP在以流的方式进行数据传输中,上层的应用协议为了对消息进行区分,往往采用如下4种方式。

1.消息长度固定。当累计读取了定长(length)的报文后,我们就认为读到了一个完整的
2.将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛。
3.将特殊的分隔符作为消息的结束标志。如本章自定义的"$_"符,另外回车符就是种特殊的结束分隔符。
4.通过在消息头中定义长度字段来标识消息的总长度。

Netty很友好的对这4种应用做了统一的抽象,提供了4种解码器来解决对应的问题。大家在使用起来非常方便。我们再也不需要自己对读取的报文进行人工解码,也不需要考虑TCP的粘包和拆包。

正式开始

DelimiterBasedFrameDecoder应用开发

DelimiterBasedFrameDecoder可以自动完成以分隔符作为结束标记的消息解码。

下面我们实现一个功能:服务端收到客户端的消息后打印消息,并发送给客户端。

服务端核心代码

public class ChildChannelHandler extends
        ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
//      ch.pipeline().addLast(new FixedLengthFrameDecoder(2));
        //下面这两句就是配置以“$_”为分隔符的解码器,怎么样简单吧
        ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
        //1024 是单条消息的最大长度,如果达到该长度后仍然没有找到分隔符就会抛出异常,这点大家要特别注意。delimiter就是我们的分隔符。
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));

        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new TimeServerHandler());
    }
}

解析 在initChannel方法中,我们配置了分割符“$_”,需要注意参数1024是单条消息的最大长度,如果达到该长度后仍然没有找到分隔符就会抛出异常。delimiter就是我们的分隔符。

继续看代码:

public class TimeServerHandler extends SimpleChannelInboundHandler<Object> {
    private int counter;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            String  body = (String) msg;
            //打印消息,需要注意的是客户端发过来的消息,有“$_”分隔符,但是接收到消息并没有这个分隔符,说明DelimiterBaseFrameDecoder自动对消息进行解码,并去掉了“$_”. 
            System.out.println("This is "+(++counter)+"times receive client:["+body+"]");
            body+="$_";//现在我们要把消息重新发回给客户端,所以要重新把这个“$_”分割符加上
            ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(echo);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

解析 在注释中已经解释的很清楚了,这里就不解释了。

客户端核心代码

客户端的代码和服务器端基本上一样。

public class TimeClient {
    public void connect(int port,String host) throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY,true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
//                  ch.pipeline().addLast(new FixedLengthFrameDecoder(2));
                    ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            System.out.println("");
        }finally{
            group.shutdownGracefully();
        }

    }
    /**
     * 入口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        int port = 9090; //监听端口号
        new TimeClient().connect(port, "localhost");
    }
}

解析 主要看initChannel方法,我们接受到消息后,添加解码器DelimiterBasedFrameDecoder和StringDecoder,最后添加TimeClientHandler处理类,最后添加到pipeline中。

接着看代码:

public class TimeClientHandler extends SimpleChannelInboundHandler {
    private int counter;
    static final String ECHO_REQ = "Hi, myfriend , welcom to netty.$_";

    public TimeClientHandler(){
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<10;i++){
            ByteBuf echo = Unpooled.copiedBuffer(ECHO_REQ.getBytes());
            ctx.writeAndFlush(echo);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("This is:"+(++counter)+"times receive server:["+msg+"]");
    }

}

看到没,在刚开始的静态常量 ECHO_REQ 的最后有一个“$_”分隔符。在channelActive方法中,我们循环发送10条消息给服务器端。

运行结果

服务器端

This is 1times receive client:[Hi, myfriend , welcom to netty.]
This is 2times receive client:[Hi, myfriend , welcom to netty.]
This is 3times receive client:[Hi, myfriend , welcom to netty.]
This is 4times receive client:[Hi, myfriend , welcom to netty.]
This is 5times receive client:[Hi, myfriend , welcom to netty.]
This is 6times receive client:[Hi, myfriend , welcom to netty.]
This is 7times receive client:[Hi, myfriend , welcom to netty.]
This is 8times receive client:[Hi, myfriend , welcom to netty.]
This is 9times receive client:[Hi, myfriend , welcom to netty.]
This is 10times receive client:[Hi, myfriend , welcom to netty.]

客户端

This is:1times receive server:[Hi, myfriend , welcom to netty.]
This is:2times receive server:[Hi, myfriend , welcom to netty.]
This is:3times receive server:[Hi, myfriend , welcom to netty.]
This is:4times receive server:[Hi, myfriend , welcom to netty.]
This is:5times receive server:[Hi, myfriend , welcom to netty.]
This is:6times receive server:[Hi, myfriend , welcom to netty.]
This is:7times receive server:[Hi, myfriend , welcom to netty.]
This is:8times receive server:[Hi, myfriend , welcom to netty.]
This is:9times receive server:[Hi, myfriend , welcom to netty.]
This is:10times receive server:[Hi, myfriend , welcom to netty.]

客户端和服务器端分别收到的10条消息,和我们的预期一样,说明了DelimiterBasedFrameDecoder可以自动采用分隔符做码流标识的消息进行解码。

FixedLengthFrameDecoder应用开发

FixedLengthFrameDecoder 是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包问题,非常方便。下面来看代码

服务器核心代码:

public class ChildChannelHandler extends
        ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        //重点在这条上,我设置了6个字符做为一条消息的长度。
        ch.pipeline().addLast(new FixedLengthFrameDecoder(6));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new TimeServerHandler());
    }
}

这个不做解释了很简单,看注释。

public class TimeServerHandler extends SimpleChannelInboundHandler<Object> {
    private int counter;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            String  body = (String) msg;
            System.out.println("This is "+(++counter)+"times receive client:["+body+"]");
            body+="$_";
            ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
            ctx.writeAndFlush(echo);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }
}

这个也不解释了,就是收到消息后,打印再发给客户端。

客户端核心代码:

public class TimeClient {
    public void connect(int port,String host) throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY,true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    //往这看,核心代码在这
                    ch.pipeline().addLast(new FixedLengthFrameDecoder(6));

                    ch.pipeline().addLast(new StringDecoder());
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            System.out.println("");
        }finally{
            group.shutdownGracefully();
        }

    }
    /**
     * 入口
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        int port = 9090; //监听端口号
        new TimeClient().connect(port, "localhost");
    }
}

不解释,看下面代码

public class TimeClientHandler extends SimpleChannelInboundHandler {
    private int counter;
    static final String ECHO_REQ = "Hi, myfriend , welcom to netty.$_";

    public TimeClientHandler(){
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for(int i=0;i<2;i++){
            ByteBuf echo = Unpooled.copiedBuffer(ECHO_REQ.getBytes());
            ctx.writeAndFlush(echo);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("This is:"+(++counter)+"times receive server:["+msg+"]");
    }
}

不解释了,很简单,在channelActive方法中循环发送2条消息(为什么现在不发10条了呢,嘿嘿,太多了,放不开。)

有趣的东西来了,大家看打印

服务器的打印:

This is 1times receive client:[Hi, my]
This is 2times receive client:[friend]
This is 3times receive client:[ , wel]
This is 4times receive client:[com to]
This is 5times receive client:[ netty]
This is 6times receive client:[.$_Hi,]
This is 7times receive client:[ myfri]
This is 8times receive client:[end , ]
This is 9times receive client:[welcom]
This is 10times receive client:[ to ne]
This is 11times receive client:[tty.$_]

客户端打印:

This is:1times receive server:[Hi, my]
This is:2times receive server:[$_frie]
This is:3times receive server:[nd$_ ,]
This is:4times receive server:[ wel$_]
This is:5times receive server:[com to]
This is:6times receive server:[$_ net]
This is:7times receive server:[ty$_.$]
This is:8times receive server:[_Hi,$_]
This is:9times receive server:[ myfri]
This is:10times receive server:[$_end ]
This is:11times receive server:[, $_we]
This is:12times receive server:[lcom$_]
This is:13times receive server:[ to ne]
This is:14times receive server:[$_tty.]

如果你仔细看就会发现,哎?!为什么服务器收到11条消息,而客户端收到14条消息呢?在这里我就不告诉大家了,大家仔细想想。

FixedLengthFrameDecoder解析
FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的拆包/粘包问题,非常实用。无论一次收到多少数据,它都会按照我们给定的长度进行解码。如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并且等待下个包到达后进行拼包,直到读到一个完整的包。

结尾

DelimiterBasedFrameDecoder用于对使用分隔符结尾的消息进行自动解码,
FixedLengthFrameDecoder用于对固定长度的消息进行自动解码。有了这二种解码器,再结合其他的解码器,可以轻松完成对很多消息的自动解码,而且不再考虑拆包/粘包的读半包问题,极大提高了开发效率。

在本章开始先给大家奉送代码,大家可以对照着学习。

好了,就讲到这里吧。今天我们讲解了LineBasedFrameDecoder和StringDecoder的使用,因为我也是现学现卖,讲解不到的地方,希望大家谅解。

本页内容版权归属为原作者,如有侵犯您的权益,请通知我们删除。

漫谈处理器体系结构 - 2016-07-22 19:07:08

漫谈处理器体系结构 前言: 这篇博客本应该是《 深入理解计算机体系结构 》(第二版)中第一部分第4章处理器体系结构的读后感,但是感觉这样的名字有点low,因为毕竟加入了自己的理解和总结。 ISA(Instruction-Set Architecture) 几乎所有讲体系结构的书都会讲到这个指令集。指令集确实应该是最先说明的问题。一句话概括起来指令集就是说CPU能干什么事。基本常用的指令集包括:传送指令、算术逻辑指令、跳转指令等。是体系结构需要实现的功能。指令集对人类来说是友好的,可阅读的。但对于只认识01

Linux学习---文件查找 - 2016-07-22 18:07:56

grep, egrep, fgrep  :文本查找 文件查找 locate 全系统查找,非实时,模糊匹配。查找时根据全系统文件数据库进行的。 系统在每天的计划任务时间生成数据库。 updatedb  手动生成文件数据库。 速度快。 find 实时查找,精确。速度慢。 遍历指定目录中所有文件完成查找。 支持多种查找标准。 find   PATH   查找标准   找到后的处理动作           路径默认:表示当前目录           标准默认:指定路径下所有文件           动作默认:为打
在上一篇文章中,详细讲述了Spring中注册AOP解析器的过程。在这篇文章中,将进一步讲解Advisors的创建过程。 Spring中注册AOP解析器的所有操作都是针对AnnotationAwareAspectJAutoProxyCreator进行的。AnnotationAwareAspectJAutoProxyCreator是实现AOP的根本。首先观察AnnotationAwareAspectJAutoProxyCreator的类层次结构。 图1 AnnotationAwareAspectJAutoPr
 总体来说 设计模式 分为三大类:创建型模式、结构型模式和行为型模式。 博主的上一篇文章已经提到过创建型模式,此外该文章还有设计模式概况和设计模式的六大原则。设计模式的六大原则是设计模式的核心思想,详情请看博主的另外一篇文章:  Java经典设计模式之五大创建模式(附实例和详解) 。 接下来我们看看结构型模式,共七种:适配器模式、装饰器模式、代理模式、外观模式、桥接模式、组合模式、享元模式。其中适配器模式主要分为三类:类的适配器模式、对象的适配器模式、接口的适配器模式。其中的对象的适配器模式是各种结构
大话设计模式 1 面向对象的好处 可维护、可重复、可扩展 。 2 包含的角色 简单工厂模式包含三个角色: 工厂类Factory :工厂类是用来制造产品的。因此,在Factory中有一个用于制造产品的Create函数或者Generate函数之类的函数。这个函数能够根据“标识符”的不同生成不同的ConcreteProduct,当然这些ConcreteProduct都是继承自AbstractProduct的。 抽象产品类AbstractProduct :抽象产品是从其他具体产品抽象出来的。抽象产品类只有一个。
Linux常用命令 (1)ls 查看当前目录下所有目录和文件 ls -l会将目录和文件竖着排,并且可以提供文件数据 上图最左边以“d”开头的是目录,以“-”开头的是文件。后面是文件和目录的权限,后面是占了多少空间的大小,然后是创建人和所有人是谁,然后是实际当中所占空间,后面是创建的时间,最右边是目录或文件的名字。 ls -m会适合左右屏幕宽度将目录和文件列出 后面的不常用的不再介绍,用到可以去查 (2)cd 打开目录 cd /打开根目录 cd /dev 打开dev目录(绝对路径) cd /之后再cd dev

Web Service学习总结 - 2016-07-22 18:07:07

Web service到底是什么, 在什么情况下你应该使用Web service:      研究一下当前的应用开发程序 ,你会发现一个绝对的倾向:人们开始偏爱基于浏览器 的客户端应用程序。这当然不是因为客户端能够提供更好的用户界面,而是因为它能够避免花在桌面应用程序 发布上的高成本。发布桌面应用程序 成本很高,一半是因为应用程序安装和配置的问题,另一半是因为客户端和服务器之间通信的问题。      传统的Windows客户应用程序使用DCOM来与服务器进行通信和调用远程对象。配置好DCOM使其在一个大型
咱们不搞一开始就一大堆理论知识介绍,怕把人讲懵了...... 咱们换一个思维方式——"从现象看本质",先说说我们看到了什么,再从看到的现象中提出问题,最后深入寻找答案。 我们看到的 cookie 我自己创建了一个网站,网址为 http://ppsc.sankuai.com 。在这个网页中我设置了几个 cookie : JSSESSIONID , PA_VTIME , skmtutc , test 。 在 chrome 浏览器中打开这个网站,进入开发者模式,点击 Resources 栏 - 选择 cooki
MyBatis 的配置文件包含了影响 MyBatis 行为甚深的设置(settings)和属性(properties)信息。 文档顶层结构: configuration 配置           properties 属性           settings 设置           typeAliases 类型命名           typeHandlers 类型处理器           objectFactory 对象工厂           plugins 插件           envir
大话设计模式 1 策略模式UML图 2 策略模式的概念 策略模式(Strategy) :它定义了算法家族,分别封装起来,让它们之间可以相互替换,此模式让算法的变化,不会影响到算法的客户。【DP】 3 策略模式和简单工厂模式的比较 简单工厂模式需要让客户端认识 两个类 ,而策略模式和简单工厂结合的用法,客户端就只需要认识 一个类 就可以了,耦合度更低。 4 策略模式解析 策略模式是一种定义一系列算法的方法,从概念上来看,所有这些算法完成的都是相同的工作,只是实现不同,他可以以相同的方式调用所有的算法,减少了